1use std::fmt::Debug;
4use std::io::{self, Cursor};
5use std::marker::PhantomData;
6use std::net::SocketAddr;
7
8use azalea_auth::game_profile::GameProfile;
9use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
10use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
11use thiserror::Error;
12use tokio::io::{AsyncWriteExt, BufStream};
13use tokio::net::TcpStream;
14use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
15use tracing::{error, info};
16use uuid::Uuid;
17
18use crate::packets::ProtocolPacket;
19use crate::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
20use crate::packets::game::{ClientboundGamePacket, ServerboundGamePacket};
21use crate::packets::handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket};
22use crate::packets::login::c_hello::ClientboundHello;
23use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
24use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
25use crate::read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet};
26use crate::write::{serialize_packet, write_raw_packet};
27
28pub struct RawReadConnection {
29 pub read_stream: OwnedReadHalf,
30 pub buffer: Cursor<Vec<u8>>,
31 pub compression_threshold: Option<u32>,
32 pub dec_cipher: Option<Aes128CfbDec>,
33}
34
35pub struct RawWriteConnection {
36 pub write_stream: OwnedWriteHalf,
37 pub compression_threshold: Option<u32>,
38 pub enc_cipher: Option<Aes128CfbEnc>,
39}
40
41pub struct ReadConnection<R: ProtocolPacket> {
43 pub raw: RawReadConnection,
44 _reading: PhantomData<R>,
45}
46
47pub struct WriteConnection<W: ProtocolPacket> {
49 pub raw: RawWriteConnection,
50 _writing: PhantomData<W>,
51}
52
53pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
132 pub reader: ReadConnection<R>,
133 pub writer: WriteConnection<W>,
134}
135
136impl RawReadConnection {
137 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
138 read_raw_packet::<_>(
139 &mut self.read_stream,
140 &mut self.buffer,
141 self.compression_threshold,
142 &mut self.dec_cipher,
143 )
144 .await
145 }
146
147 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
148 try_read_raw_packet::<_>(
149 &mut self.read_stream,
150 &mut self.buffer,
151 self.compression_threshold,
152 &mut self.dec_cipher,
153 )
154 }
155}
156
157impl RawWriteConnection {
158 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
159 if let Err(e) = write_raw_packet(
160 packet,
161 &mut self.write_stream,
162 self.compression_threshold,
163 &mut self.enc_cipher,
164 )
165 .await
166 {
167 if e.kind() == io::ErrorKind::BrokenPipe {
169 info!("Broken pipe, shutting down connection.");
170 if let Err(e) = self.shutdown().await {
171 error!("Couldn't shut down: {}", e);
172 }
173 }
174 return Err(e);
175 }
176 Ok(())
177 }
178
179 pub async fn shutdown(&mut self) -> io::Result<()> {
181 self.write_stream.shutdown().await
182 }
183}
184
185impl<R> ReadConnection<R>
186where
187 R: ProtocolPacket + Debug,
188{
189 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
191 let raw_packet = self.raw.read().await?;
192 deserialize_packet(&mut Cursor::new(&raw_packet))
193 }
194
195 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
198 let Some(raw_packet) = self.raw.try_read()? else {
199 return Ok(None);
200 };
201 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
202 }
203}
204impl<W> WriteConnection<W>
205where
206 W: ProtocolPacket + Debug,
207{
208 pub async fn write(&mut self, packet: W) -> io::Result<()> {
210 self.raw.write(&serialize_packet(&packet).unwrap()).await
211 }
212
213 pub async fn shutdown(&mut self) -> io::Result<()> {
215 self.raw.shutdown().await
216 }
217}
218
219impl<R, W> Connection<R, W>
220where
221 R: ProtocolPacket + Debug,
222 W: ProtocolPacket + Debug,
223{
224 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
226 self.reader.read().await
227 }
228
229 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232 self.reader.try_read()
233 }
234
235 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
237 let packet = packet.into_variant();
238 self.writer.write(packet).await
239 }
240
241 #[must_use]
243 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
244 (self.reader, self.writer)
245 }
246
247 #[must_use]
252 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
253 (self.reader.raw, self.writer.raw)
254 }
255}
256
257#[derive(Error, Debug)]
258pub enum ConnectionError {
259 #[error("{0}")]
260 Io(#[from] io::Error),
261}
262
263use socks5_impl::protocol::UserKey;
264
265#[derive(Debug, Clone)]
266pub struct Proxy {
267 pub addr: SocketAddr,
268 pub auth: Option<UserKey>,
269}
270
271impl Proxy {
272 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
273 Self { addr, auth }
274 }
275}
276
277impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
278 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
280 let stream = TcpStream::connect(address).await?;
281
282 stream.set_nodelay(true)?;
284
285 Self::new_from_stream(stream).await
286 }
287
288 pub async fn new_with_proxy(
291 address: &SocketAddr,
292 proxy: Proxy,
293 ) -> Result<Self, ConnectionError> {
294 let proxy_stream = TcpStream::connect(proxy.addr).await?;
295 let mut stream = BufStream::new(proxy_stream);
296
297 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
298 .await
299 .map_err(io::Error::other)?;
300
301 Self::new_from_stream(stream.into_inner()).await
302 }
303
304 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
307 let (read_stream, write_stream) = stream.into_split();
308
309 Ok(Connection {
310 reader: ReadConnection {
311 raw: RawReadConnection {
312 read_stream,
313 buffer: Cursor::new(Vec::new()),
314 compression_threshold: None,
315 dec_cipher: None,
316 },
317 _reading: PhantomData,
318 },
319 writer: WriteConnection {
320 raw: RawWriteConnection {
321 write_stream,
322 compression_threshold: None,
323 enc_cipher: None,
324 },
325 _writing: PhantomData,
326 },
327 })
328 }
329
330 #[must_use]
333 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
334 Connection::from(self)
335 }
336
337 #[must_use]
340 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
341 Connection::from(self)
342 }
343}
344
345impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
346 pub fn set_compression_threshold(&mut self, threshold: i32) {
351 if threshold >= 0 {
353 self.reader.raw.compression_threshold = Some(threshold as u32);
354 self.writer.raw.compression_threshold = Some(threshold as u32);
355 } else {
356 self.reader.raw.compression_threshold = None;
357 self.writer.raw.compression_threshold = None;
358 }
359 }
360
361 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
364 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
365 self.reader.raw.dec_cipher = Some(dec_cipher);
366 self.writer.raw.enc_cipher = Some(enc_cipher);
367 }
368
369 #[must_use]
372 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
373 Connection::from(self)
374 }
375
376 pub async fn authenticate(
428 &self,
429 access_token: &str,
430 uuid: &Uuid,
431 private_key: [u8; 16],
432 packet: &ClientboundHello,
433 ) -> Result<(), ClientSessionServerError> {
434 azalea_auth::sessionserver::join(
435 access_token,
436 &packet.public_key,
437 &private_key,
438 uuid,
439 &packet.server_id,
440 )
441 .await
442 }
443}
444
445impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
446 #[must_use]
449 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
450 Connection::from(self)
451 }
452
453 #[must_use]
456 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
457 Connection::from(self)
458 }
459}
460
461impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
462 pub fn set_compression_threshold(&mut self, threshold: i32) {
466 if threshold >= 0 {
468 self.reader.raw.compression_threshold = Some(threshold as u32);
469 self.writer.raw.compression_threshold = Some(threshold as u32);
470 } else {
471 self.reader.raw.compression_threshold = None;
472 self.writer.raw.compression_threshold = None;
473 }
474 }
475
476 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
479 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
480 self.reader.raw.dec_cipher = Some(dec_cipher);
481 self.writer.raw.enc_cipher = Some(enc_cipher);
482 }
483
484 #[must_use]
487 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
488 Connection::from(self)
489 }
490
491 pub async fn authenticate(
495 &self,
496 username: &str,
497 public_key: &[u8],
498 private_key: &[u8; 16],
499 ip: Option<&str>,
500 ) -> Result<GameProfile, ServerSessionServerError> {
501 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
502 }
503
504 #[must_use]
506 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
507 Connection::from(self)
508 }
509}
510
511impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
512 #[must_use]
515 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
516 Connection::from(self)
517 }
518}
519
520impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
521 #[must_use]
524 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
525 Connection::from(self)
526 }
527}
528
529impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
530 #[must_use]
532 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
533 Connection::from(self)
534 }
535}
536impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
537 #[must_use]
539 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
540 Connection::from(self)
541 }
542}
543
544impl<R1, W1> Connection<R1, W1>
547where
548 R1: ProtocolPacket + Debug,
549 W1: ProtocolPacket + Debug,
550{
551 #[must_use]
554 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
555 where
556 R2: ProtocolPacket + Debug,
557 W2: ProtocolPacket + Debug,
558 {
559 Connection {
560 reader: ReadConnection {
561 raw: connection.reader.raw,
562 _reading: PhantomData,
563 },
564 writer: WriteConnection {
565 raw: connection.writer.raw,
566 _writing: PhantomData,
567 },
568 }
569 }
570
571 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
573 let (read_stream, write_stream) = stream.into_split();
574
575 Connection {
576 reader: ReadConnection {
577 raw: RawReadConnection {
578 read_stream,
579 buffer: Cursor::new(Vec::new()),
580 compression_threshold: None,
581 dec_cipher: None,
582 },
583 _reading: PhantomData,
584 },
585 writer: WriteConnection {
586 raw: RawWriteConnection {
587 write_stream,
588 compression_threshold: None,
589 enc_cipher: None,
590 },
591 _writing: PhantomData,
592 },
593 }
594 }
595
596 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
598 self.reader
599 .raw
600 .read_stream
601 .reunite(self.writer.raw.write_stream)
602 }
603}