1use std::fmt::Debug;
4use std::io::Cursor;
5use std::marker::PhantomData;
6use std::net::SocketAddr;
7
8use azalea_auth::game_profile::GameProfile;
9use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
10use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
11use thiserror::Error;
12use tokio::io::{AsyncWriteExt, BufStream};
13use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
14use tokio::net::TcpStream;
15use tracing::{error, info};
16use uuid::Uuid;
17
18use crate::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
19use crate::packets::game::{ClientboundGamePacket, ServerboundGamePacket};
20use crate::packets::handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket};
21use crate::packets::login::c_hello::ClientboundHello;
22use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
23use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
24use crate::packets::ProtocolPacket;
25use crate::read::{deserialize_packet, read_raw_packet, try_read_raw_packet, ReadPacketError};
26use crate::write::{serialize_packet, write_raw_packet};
27
28pub struct RawReadConnection {
29 pub read_stream: OwnedReadHalf,
30 pub buffer: Cursor<Vec<u8>>,
31 pub compression_threshold: Option<u32>,
32 pub dec_cipher: Option<Aes128CfbDec>,
33}
34
35pub struct RawWriteConnection {
36 pub write_stream: OwnedWriteHalf,
37 pub compression_threshold: Option<u32>,
38 pub enc_cipher: Option<Aes128CfbEnc>,
39}
40
41pub struct ReadConnection<R: ProtocolPacket> {
43 pub raw: RawReadConnection,
44 _reading: PhantomData<R>,
45}
46
47pub struct WriteConnection<W: ProtocolPacket> {
49 pub raw: RawWriteConnection,
50 _writing: PhantomData<W>,
51}
52
53pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
132 pub reader: ReadConnection<R>,
133 pub writer: WriteConnection<W>,
134}
135
136impl RawReadConnection {
137 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
138 read_raw_packet::<_>(
139 &mut self.read_stream,
140 &mut self.buffer,
141 self.compression_threshold,
142 &mut self.dec_cipher,
143 )
144 .await
145 }
146
147 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
148 try_read_raw_packet::<_>(
149 &mut self.read_stream,
150 &mut self.buffer,
151 self.compression_threshold,
152 &mut self.dec_cipher,
153 )
154 }
155}
156
157impl RawWriteConnection {
158 pub async fn write(&mut self, packet: &[u8]) -> std::io::Result<()> {
159 if let Err(e) = write_raw_packet(
160 packet,
161 &mut self.write_stream,
162 self.compression_threshold,
163 &mut self.enc_cipher,
164 )
165 .await
166 {
167 if e.kind() == std::io::ErrorKind::BrokenPipe {
169 info!("Broken pipe, shutting down connection.");
170 if let Err(e) = self.shutdown().await {
171 error!("Couldn't shut down: {}", e);
172 }
173 }
174 return Err(e);
175 }
176 Ok(())
177 }
178
179 pub async fn shutdown(&mut self) -> std::io::Result<()> {
181 self.write_stream.shutdown().await
182 }
183}
184
185impl<R> ReadConnection<R>
186where
187 R: ProtocolPacket + Debug,
188{
189 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
191 let raw_packet = self.raw.read().await?;
192 deserialize_packet(&mut Cursor::new(&raw_packet))
193 }
194
195 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
198 let Some(raw_packet) = self.raw.try_read()? else {
199 return Ok(None);
200 };
201 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
202 }
203}
204impl<W> WriteConnection<W>
205where
206 W: ProtocolPacket + Debug,
207{
208 pub async fn write(&mut self, packet: W) -> std::io::Result<()> {
210 self.raw.write(&serialize_packet(&packet).unwrap()).await
211 }
212
213 pub async fn shutdown(&mut self) -> std::io::Result<()> {
215 self.raw.shutdown().await
216 }
217}
218
219impl<R, W> Connection<R, W>
220where
221 R: ProtocolPacket + Debug,
222 W: ProtocolPacket + Debug,
223{
224 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
226 self.reader.read().await
227 }
228
229 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232 self.reader.try_read()
233 }
234
235 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> std::io::Result<()> {
237 let packet = packet.into_variant();
238 self.writer.write(packet).await
239 }
240
241 #[must_use]
243 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
244 (self.reader, self.writer)
245 }
246}
247
248#[derive(Error, Debug)]
249pub enum ConnectionError {
250 #[error("{0}")]
251 Io(#[from] std::io::Error),
252}
253
254use socks5_impl::protocol::UserKey;
255
256#[derive(Debug, Clone)]
257pub struct Proxy {
258 pub addr: SocketAddr,
259 pub auth: Option<UserKey>,
260}
261
262impl Proxy {
263 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
264 Self { addr, auth }
265 }
266}
267
268impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
269 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
271 let stream = TcpStream::connect(address).await?;
272
273 stream.set_nodelay(true)?;
275
276 Self::new_from_stream(stream).await
277 }
278
279 pub async fn new_with_proxy(
282 address: &SocketAddr,
283 proxy: Proxy,
284 ) -> Result<Self, ConnectionError> {
285 let proxy_stream = TcpStream::connect(proxy.addr).await?;
286 let mut stream = BufStream::new(proxy_stream);
287
288 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
289 .await
290 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
291
292 Self::new_from_stream(stream.into_inner()).await
293 }
294
295 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
298 let (read_stream, write_stream) = stream.into_split();
299
300 Ok(Connection {
301 reader: ReadConnection {
302 raw: RawReadConnection {
303 read_stream,
304 buffer: Cursor::new(Vec::new()),
305 compression_threshold: None,
306 dec_cipher: None,
307 },
308 _reading: PhantomData,
309 },
310 writer: WriteConnection {
311 raw: RawWriteConnection {
312 write_stream,
313 compression_threshold: None,
314 enc_cipher: None,
315 },
316 _writing: PhantomData,
317 },
318 })
319 }
320
321 #[must_use]
324 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
325 Connection::from(self)
326 }
327
328 #[must_use]
331 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
332 Connection::from(self)
333 }
334}
335
336impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
337 pub fn set_compression_threshold(&mut self, threshold: i32) {
341 if threshold >= 0 {
343 self.reader.raw.compression_threshold = Some(threshold as u32);
344 self.writer.raw.compression_threshold = Some(threshold as u32);
345 } else {
346 self.reader.raw.compression_threshold = None;
347 self.writer.raw.compression_threshold = None;
348 }
349 }
350
351 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
354 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
355 self.reader.raw.dec_cipher = Some(dec_cipher);
356 self.writer.raw.enc_cipher = Some(enc_cipher);
357 }
358
359 #[must_use]
362 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
363 Connection::from(self)
364 }
365
366 pub async fn authenticate(
418 &self,
419 access_token: &str,
420 uuid: &Uuid,
421 private_key: [u8; 16],
422 packet: &ClientboundHello,
423 ) -> Result<(), ClientSessionServerError> {
424 azalea_auth::sessionserver::join(
425 access_token,
426 &packet.public_key,
427 &private_key,
428 uuid,
429 &packet.server_id,
430 )
431 .await
432 }
433}
434
435impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
436 #[must_use]
439 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
440 Connection::from(self)
441 }
442
443 #[must_use]
446 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
447 Connection::from(self)
448 }
449}
450
451impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
452 pub fn set_compression_threshold(&mut self, threshold: i32) {
456 if threshold >= 0 {
458 self.reader.raw.compression_threshold = Some(threshold as u32);
459 self.writer.raw.compression_threshold = Some(threshold as u32);
460 } else {
461 self.reader.raw.compression_threshold = None;
462 self.writer.raw.compression_threshold = None;
463 }
464 }
465
466 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
469 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
470 self.reader.raw.dec_cipher = Some(dec_cipher);
471 self.writer.raw.enc_cipher = Some(enc_cipher);
472 }
473
474 #[must_use]
477 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
478 Connection::from(self)
479 }
480
481 pub async fn authenticate(
485 &self,
486 username: &str,
487 public_key: &[u8],
488 private_key: &[u8; 16],
489 ip: Option<&str>,
490 ) -> Result<GameProfile, ServerSessionServerError> {
491 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
492 }
493
494 #[must_use]
496 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
497 Connection::from(self)
498 }
499}
500
501impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
502 #[must_use]
505 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
506 Connection::from(self)
507 }
508}
509
510impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
511 #[must_use]
514 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
515 Connection::from(self)
516 }
517}
518
519impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
520 #[must_use]
522 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
523 Connection::from(self)
524 }
525}
526impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
527 #[must_use]
529 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
530 Connection::from(self)
531 }
532}
533
534impl<R1, W1> Connection<R1, W1>
537where
538 R1: ProtocolPacket + Debug,
539 W1: ProtocolPacket + Debug,
540{
541 #[must_use]
544 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
545 where
546 R2: ProtocolPacket + Debug,
547 W2: ProtocolPacket + Debug,
548 {
549 Connection {
550 reader: ReadConnection {
551 raw: connection.reader.raw,
552 _reading: PhantomData,
553 },
554 writer: WriteConnection {
555 raw: connection.writer.raw,
556 _writing: PhantomData,
557 },
558 }
559 }
560
561 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
563 let (read_stream, write_stream) = stream.into_split();
564
565 Connection {
566 reader: ReadConnection {
567 raw: RawReadConnection {
568 read_stream,
569 buffer: Cursor::new(Vec::new()),
570 compression_threshold: None,
571 dec_cipher: None,
572 },
573 _reading: PhantomData,
574 },
575 writer: WriteConnection {
576 raw: RawWriteConnection {
577 write_stream,
578 compression_threshold: None,
579 enc_cipher: None,
580 },
581 _writing: PhantomData,
582 },
583 }
584 }
585
586 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
588 self.reader
589 .raw
590 .read_stream
591 .reunite(self.writer.raw.write_stream)
592 }
593}