1use std::fmt::Debug;
4use std::io::{self, Cursor};
5use std::marker::PhantomData;
6use std::net::SocketAddr;
7
8use azalea_auth::game_profile::GameProfile;
9use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
10use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
11use thiserror::Error;
12use tokio::io::{AsyncWriteExt, BufStream};
13use tokio::net::TcpStream;
14use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
15use tracing::{error, info};
16use uuid::Uuid;
17
18use crate::packets::ProtocolPacket;
19use crate::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
20use crate::packets::game::{ClientboundGamePacket, ServerboundGamePacket};
21use crate::packets::handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket};
22use crate::packets::login::c_hello::ClientboundHello;
23use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
24use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
25use crate::read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet};
26use crate::write::{serialize_packet, write_raw_packet};
27
28pub struct RawReadConnection {
29 pub read_stream: OwnedReadHalf,
30 pub buffer: Cursor<Vec<u8>>,
31 pub compression_threshold: Option<u32>,
32 pub dec_cipher: Option<Aes128CfbDec>,
33}
34
35pub struct RawWriteConnection {
36 pub write_stream: OwnedWriteHalf,
37 pub compression_threshold: Option<u32>,
38 pub enc_cipher: Option<Aes128CfbEnc>,
39}
40
41pub struct ReadConnection<R: ProtocolPacket> {
43 pub raw: RawReadConnection,
44 _reading: PhantomData<R>,
45}
46
47pub struct WriteConnection<W: ProtocolPacket> {
49 pub raw: RawWriteConnection,
50 _writing: PhantomData<W>,
51}
52
53pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
132 pub reader: ReadConnection<R>,
133 pub writer: WriteConnection<W>,
134}
135
136impl RawReadConnection {
137 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
138 read_raw_packet::<_>(
139 &mut self.read_stream,
140 &mut self.buffer,
141 self.compression_threshold,
142 &mut self.dec_cipher,
143 )
144 .await
145 }
146
147 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
148 try_read_raw_packet::<_>(
149 &mut self.read_stream,
150 &mut self.buffer,
151 self.compression_threshold,
152 &mut self.dec_cipher,
153 )
154 }
155}
156
157impl RawWriteConnection {
158 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
159 if let Err(e) = write_raw_packet(
160 packet,
161 &mut self.write_stream,
162 self.compression_threshold,
163 &mut self.enc_cipher,
164 )
165 .await
166 {
167 if e.kind() == io::ErrorKind::BrokenPipe {
169 info!("Broken pipe, shutting down connection.");
170 if let Err(e) = self.shutdown().await {
171 error!("Couldn't shut down: {}", e);
172 }
173 }
174 return Err(e);
175 }
176 Ok(())
177 }
178
179 pub async fn shutdown(&mut self) -> io::Result<()> {
181 self.write_stream.shutdown().await
182 }
183}
184
185impl<R> ReadConnection<R>
186where
187 R: ProtocolPacket + Debug,
188{
189 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
191 let raw_packet = self.raw.read().await?;
192 deserialize_packet(&mut Cursor::new(&raw_packet))
193 }
194
195 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
198 let Some(raw_packet) = self.raw.try_read()? else {
199 return Ok(None);
200 };
201 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
202 }
203}
204impl<W> WriteConnection<W>
205where
206 W: ProtocolPacket + Debug,
207{
208 pub async fn write(&mut self, packet: W) -> io::Result<()> {
210 self.raw.write(&serialize_packet(&packet).unwrap()).await
211 }
212
213 pub async fn shutdown(&mut self) -> io::Result<()> {
215 self.raw.shutdown().await
216 }
217}
218
219impl<R, W> Connection<R, W>
220where
221 R: ProtocolPacket + Debug,
222 W: ProtocolPacket + Debug,
223{
224 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
226 self.reader.read().await
227 }
228
229 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232 self.reader.try_read()
233 }
234
235 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
237 let packet = packet.into_variant();
238 self.writer.write(packet).await
239 }
240
241 #[must_use]
243 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
244 (self.reader, self.writer)
245 }
246
247 #[must_use]
252 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
253 (self.reader.raw, self.writer.raw)
254 }
255}
256
257#[derive(Error, Debug)]
258pub enum ConnectionError {
259 #[error("{0}")]
260 Io(#[from] io::Error),
261}
262
263use socks5_impl::protocol::UserKey;
264
265#[derive(Debug, Clone)]
266pub struct Proxy {
267 pub addr: SocketAddr,
268 pub auth: Option<UserKey>,
269}
270
271impl Proxy {
272 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
273 Self { addr, auth }
274 }
275}
276
277impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
278 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
280 let stream = TcpStream::connect(address).await?;
281
282 stream.set_nodelay(true)?;
284
285 Self::new_from_stream(stream).await
286 }
287
288 pub async fn new_with_proxy(
291 address: &SocketAddr,
292 proxy: Proxy,
293 ) -> Result<Self, ConnectionError> {
294 let proxy_stream = TcpStream::connect(proxy.addr).await?;
295 let mut stream = BufStream::new(proxy_stream);
296
297 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
298 .await
299 .map_err(io::Error::other)?;
300
301 Self::new_from_stream(stream.into_inner()).await
302 }
303
304 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
307 let (read_stream, write_stream) = stream.into_split();
308
309 Ok(Connection {
310 reader: ReadConnection {
311 raw: RawReadConnection {
312 read_stream,
313 buffer: Cursor::new(Vec::new()),
314 compression_threshold: None,
315 dec_cipher: None,
316 },
317 _reading: PhantomData,
318 },
319 writer: WriteConnection {
320 raw: RawWriteConnection {
321 write_stream,
322 compression_threshold: None,
323 enc_cipher: None,
324 },
325 _writing: PhantomData,
326 },
327 })
328 }
329
330 #[must_use]
333 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
334 Connection::from(self)
335 }
336
337 #[must_use]
340 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
341 Connection::from(self)
342 }
343}
344
345impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
346 pub fn set_compression_threshold(&mut self, threshold: i32) {
350 if threshold >= 0 {
352 self.reader.raw.compression_threshold = Some(threshold as u32);
353 self.writer.raw.compression_threshold = Some(threshold as u32);
354 } else {
355 self.reader.raw.compression_threshold = None;
356 self.writer.raw.compression_threshold = None;
357 }
358 }
359
360 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
363 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
364 self.reader.raw.dec_cipher = Some(dec_cipher);
365 self.writer.raw.enc_cipher = Some(enc_cipher);
366 }
367
368 #[must_use]
371 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
372 Connection::from(self)
373 }
374
375 pub async fn authenticate(
427 &self,
428 access_token: &str,
429 uuid: &Uuid,
430 private_key: [u8; 16],
431 packet: &ClientboundHello,
432 ) -> Result<(), ClientSessionServerError> {
433 azalea_auth::sessionserver::join(
434 access_token,
435 &packet.public_key,
436 &private_key,
437 uuid,
438 &packet.server_id,
439 )
440 .await
441 }
442}
443
444impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
445 #[must_use]
448 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
449 Connection::from(self)
450 }
451
452 #[must_use]
455 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
456 Connection::from(self)
457 }
458}
459
460impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
461 pub fn set_compression_threshold(&mut self, threshold: i32) {
465 if threshold >= 0 {
467 self.reader.raw.compression_threshold = Some(threshold as u32);
468 self.writer.raw.compression_threshold = Some(threshold as u32);
469 } else {
470 self.reader.raw.compression_threshold = None;
471 self.writer.raw.compression_threshold = None;
472 }
473 }
474
475 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
478 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
479 self.reader.raw.dec_cipher = Some(dec_cipher);
480 self.writer.raw.enc_cipher = Some(enc_cipher);
481 }
482
483 #[must_use]
486 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
487 Connection::from(self)
488 }
489
490 pub async fn authenticate(
494 &self,
495 username: &str,
496 public_key: &[u8],
497 private_key: &[u8; 16],
498 ip: Option<&str>,
499 ) -> Result<GameProfile, ServerSessionServerError> {
500 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
501 }
502
503 #[must_use]
505 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
506 Connection::from(self)
507 }
508}
509
510impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
511 #[must_use]
514 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
515 Connection::from(self)
516 }
517}
518
519impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
520 #[must_use]
523 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
524 Connection::from(self)
525 }
526}
527
528impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
529 #[must_use]
531 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
532 Connection::from(self)
533 }
534}
535impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
536 #[must_use]
538 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
539 Connection::from(self)
540 }
541}
542
543impl<R1, W1> Connection<R1, W1>
546where
547 R1: ProtocolPacket + Debug,
548 W1: ProtocolPacket + Debug,
549{
550 #[must_use]
553 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
554 where
555 R2: ProtocolPacket + Debug,
556 W2: ProtocolPacket + Debug,
557 {
558 Connection {
559 reader: ReadConnection {
560 raw: connection.reader.raw,
561 _reading: PhantomData,
562 },
563 writer: WriteConnection {
564 raw: connection.writer.raw,
565 _writing: PhantomData,
566 },
567 }
568 }
569
570 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
572 let (read_stream, write_stream) = stream.into_split();
573
574 Connection {
575 reader: ReadConnection {
576 raw: RawReadConnection {
577 read_stream,
578 buffer: Cursor::new(Vec::new()),
579 compression_threshold: None,
580 dec_cipher: None,
581 },
582 _reading: PhantomData,
583 },
584 writer: WriteConnection {
585 raw: RawWriteConnection {
586 write_stream,
587 compression_threshold: None,
588 enc_cipher: None,
589 },
590 _writing: PhantomData,
591 },
592 }
593 }
594
595 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
597 self.reader
598 .raw
599 .read_stream
600 .reunite(self.writer.raw.write_stream)
601 }
602}