1use std::{
4 fmt::{self, Debug, Display},
5 io::{self, Cursor},
6 marker::PhantomData,
7 net::SocketAddr,
8};
9
10use azalea_auth::{
11 game_profile::GameProfile,
12 sessionserver::{ClientSessionServerError, ServerSessionServerError},
13};
14use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
15use thiserror::Error;
16use tokio::{
17 io::{AsyncWriteExt, BufStream},
18 net::{
19 TcpStream,
20 tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
21 },
22};
23use tracing::{error, info};
24use uuid::Uuid;
25
26use crate::{
27 packets::{
28 ProtocolPacket,
29 config::{ClientboundConfigPacket, ServerboundConfigPacket},
30 game::{ClientboundGamePacket, ServerboundGamePacket},
31 handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
32 login::{ClientboundLoginPacket, ServerboundLoginPacket, c_hello::ClientboundHello},
33 status::{ClientboundStatusPacket, ServerboundStatusPacket},
34 },
35 read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
36 write::{serialize_packet, write_raw_packet},
37};
38
39pub struct RawReadConnection {
40 pub read_stream: OwnedReadHalf,
41 pub buffer: Cursor<Vec<u8>>,
42 pub compression_threshold: Option<u32>,
43 pub dec_cipher: Option<Aes128CfbDec>,
44}
45
46pub struct RawWriteConnection {
47 pub write_stream: OwnedWriteHalf,
48 pub compression_threshold: Option<u32>,
49 pub enc_cipher: Option<Aes128CfbEnc>,
50}
51
52pub struct ReadConnection<R: ProtocolPacket> {
54 pub raw: RawReadConnection,
55 _reading: PhantomData<R>,
56}
57
58pub struct WriteConnection<W: ProtocolPacket> {
60 pub raw: RawWriteConnection,
61 _writing: PhantomData<W>,
62}
63
64pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
141 pub reader: ReadConnection<R>,
142 pub writer: WriteConnection<W>,
143}
144
145impl RawReadConnection {
146 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
147 read_raw_packet::<_>(
148 &mut self.read_stream,
149 &mut self.buffer,
150 self.compression_threshold,
151 &mut self.dec_cipher,
152 )
153 .await
154 }
155
156 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
157 try_read_raw_packet::<_>(
158 &mut self.read_stream,
159 &mut self.buffer,
160 self.compression_threshold,
161 &mut self.dec_cipher,
162 )
163 }
164}
165
166impl RawWriteConnection {
167 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
168 if let Err(e) = write_raw_packet(
169 packet,
170 &mut self.write_stream,
171 self.compression_threshold,
172 &mut self.enc_cipher,
173 )
174 .await
175 {
176 if e.kind() == io::ErrorKind::BrokenPipe {
178 info!("Broken pipe, shutting down connection.");
179 if let Err(e) = self.shutdown().await {
180 error!("Couldn't shut down: {}", e);
181 }
182 }
183 return Err(e);
184 }
185 Ok(())
186 }
187
188 pub async fn shutdown(&mut self) -> io::Result<()> {
190 self.write_stream.shutdown().await
191 }
192}
193
194impl<R> ReadConnection<R>
195where
196 R: ProtocolPacket + Debug,
197{
198 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
200 let raw_packet = self.raw.read().await?;
201 deserialize_packet(&mut Cursor::new(&raw_packet))
202 }
203
204 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
207 let Some(raw_packet) = self.raw.try_read()? else {
208 return Ok(None);
209 };
210 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
211 }
212}
213impl<W> WriteConnection<W>
214where
215 W: ProtocolPacket + Debug,
216{
217 pub async fn write(&mut self, packet: W) -> io::Result<()> {
219 self.raw.write(&serialize_packet(&packet).unwrap()).await
220 }
221
222 pub async fn shutdown(&mut self) -> io::Result<()> {
224 self.raw.shutdown().await
225 }
226}
227
228impl<R, W> Connection<R, W>
229where
230 R: ProtocolPacket + Debug,
231 W: ProtocolPacket + Debug,
232{
233 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
235 self.reader.read().await
236 }
237
238 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
241 self.reader.try_read()
242 }
243
244 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
246 let packet = packet.into_variant();
247 self.writer.write(packet).await
248 }
249
250 #[must_use]
254 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
255 (self.reader, self.writer)
256 }
257
258 #[must_use]
263 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
264 (self.reader.raw, self.writer.raw)
265 }
266}
267
268#[derive(Error, Debug)]
269pub enum ConnectionError {
270 #[error("{0}")]
271 Io(#[from] io::Error),
272}
273
274use socks5_impl::protocol::UserKey;
275
276#[derive(Debug, Clone)]
278pub struct Proxy {
279 pub addr: SocketAddr,
280 pub auth: Option<UserKey>,
281}
282
283impl Proxy {
284 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
285 Self { addr, auth }
286 }
287}
288impl Display for Proxy {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 write!(f, "socks5://")?;
291 if let Some(auth) = &self.auth {
292 write!(f, "{auth}@")?;
293 }
294 write!(f, "{}", self.addr)
295 }
296}
297
298impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
299 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
301 let stream = TcpStream::connect(address).await?;
302
303 stream.set_nodelay(true)?;
305
306 Self::new_from_stream(stream).await
307 }
308
309 pub async fn new_with_proxy(
313 address: &SocketAddr,
314 proxy: Proxy,
315 ) -> Result<Self, ConnectionError> {
316 let proxy_stream = TcpStream::connect(proxy.addr).await?;
317 let mut stream = BufStream::new(proxy_stream);
318
319 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
320 .await
321 .map_err(io::Error::other)?;
322
323 Self::new_from_stream(stream.into_inner()).await
324 }
325
326 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
331 let (read_stream, write_stream) = stream.into_split();
332
333 Ok(Connection {
334 reader: ReadConnection {
335 raw: RawReadConnection {
336 read_stream,
337 buffer: Cursor::new(Vec::new()),
338 compression_threshold: None,
339 dec_cipher: None,
340 },
341 _reading: PhantomData,
342 },
343 writer: WriteConnection {
344 raw: RawWriteConnection {
345 write_stream,
346 compression_threshold: None,
347 enc_cipher: None,
348 },
349 _writing: PhantomData,
350 },
351 })
352 }
353
354 #[must_use]
358 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
359 Connection::from(self)
360 }
361
362 #[must_use]
366 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
367 Connection::from(self)
368 }
369}
370
371impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
372 pub fn set_compression_threshold(&mut self, threshold: i32) {
378 if threshold >= 0 {
380 self.reader.raw.compression_threshold = Some(threshold as u32);
381 self.writer.raw.compression_threshold = Some(threshold as u32);
382 } else {
383 self.reader.raw.compression_threshold = None;
384 self.writer.raw.compression_threshold = None;
385 }
386 }
387
388 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
392 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
393 self.reader.raw.dec_cipher = Some(dec_cipher);
394 self.writer.raw.enc_cipher = Some(enc_cipher);
395 }
396
397 #[must_use]
402 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
403 Connection::from(self)
404 }
405
406 pub async fn authenticate(
457 &self,
458 access_token: &str,
459 uuid: &Uuid,
460 private_key: [u8; 16],
461 packet: &ClientboundHello,
462 ) -> Result<(), ClientSessionServerError> {
463 azalea_auth::sessionserver::join(
464 access_token,
465 &packet.public_key,
466 &private_key,
467 uuid,
468 &packet.server_id,
469 )
470 .await
471 }
472}
473
474impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
475 #[must_use]
480 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
481 Connection::from(self)
482 }
483
484 #[must_use]
488 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
489 Connection::from(self)
490 }
491}
492
493impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
494 pub fn set_compression_threshold(&mut self, threshold: i32) {
499 if threshold >= 0 {
501 self.reader.raw.compression_threshold = Some(threshold as u32);
502 self.writer.raw.compression_threshold = Some(threshold as u32);
503 } else {
504 self.reader.raw.compression_threshold = None;
505 self.writer.raw.compression_threshold = None;
506 }
507 }
508
509 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
513 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
514 self.reader.raw.dec_cipher = Some(dec_cipher);
515 self.writer.raw.enc_cipher = Some(enc_cipher);
516 }
517
518 #[must_use]
522 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
523 Connection::from(self)
524 }
525
526 pub async fn authenticate(
530 &self,
531 username: &str,
532 public_key: &[u8],
533 private_key: &[u8; 16],
534 ip: Option<&str>,
535 ) -> Result<GameProfile, ServerSessionServerError> {
536 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
537 }
538
539 #[must_use]
541 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
542 Connection::from(self)
543 }
544}
545
546impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
547 #[must_use]
551 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
552 Connection::from(self)
553 }
554}
555
556impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
557 #[must_use]
561 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
562 Connection::from(self)
563 }
564}
565
566impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
567 #[must_use]
569 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
570 Connection::from(self)
571 }
572}
573impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
574 #[must_use]
576 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
577 Connection::from(self)
578 }
579}
580
581impl<R1, W1> Connection<R1, W1>
584where
585 R1: ProtocolPacket + Debug,
586 W1: ProtocolPacket + Debug,
587{
588 #[must_use]
591 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
592 where
593 R2: ProtocolPacket + Debug,
594 W2: ProtocolPacket + Debug,
595 {
596 Connection {
597 reader: ReadConnection {
598 raw: connection.reader.raw,
599 _reading: PhantomData,
600 },
601 writer: WriteConnection {
602 raw: connection.writer.raw,
603 _writing: PhantomData,
604 },
605 }
606 }
607
608 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
610 let (read_stream, write_stream) = stream.into_split();
611
612 Connection {
613 reader: ReadConnection {
614 raw: RawReadConnection {
615 read_stream,
616 buffer: Cursor::new(Vec::new()),
617 compression_threshold: None,
618 dec_cipher: None,
619 },
620 _reading: PhantomData,
621 },
622 writer: WriteConnection {
623 raw: RawWriteConnection {
624 write_stream,
625 compression_threshold: None,
626 enc_cipher: None,
627 },
628 _writing: PhantomData,
629 },
630 }
631 }
632
633 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
635 self.reader
636 .raw
637 .read_stream
638 .reunite(self.writer.raw.write_stream)
639 }
640}