1use std::{
4 fmt::{self, Debug, Display},
5 io::{self, Cursor},
6 marker::PhantomData,
7 net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12 game_profile::GameProfile,
13 sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18 io::{AsyncWriteExt, BufStream},
19 net::{
20 TcpStream,
21 tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22 },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31 packets::{
32 ProtocolPacket,
33 config::{ClientboundConfigPacket, ServerboundConfigPacket},
34 game::{ClientboundGamePacket, ServerboundGamePacket},
35 handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36 login::{ClientboundLoginPacket, ServerboundLoginPacket},
37 status::{ClientboundStatusPacket, ServerboundStatusPacket},
38 },
39 read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40 write::{serialize_packet, write_raw_packet},
41};
42
43pub struct RawReadConnection {
44 pub read_stream: OwnedReadHalf,
45 pub buffer: Cursor<Vec<u8>>,
46 pub compression_threshold: Option<u32>,
47 pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51 pub write_stream: OwnedWriteHalf,
52 pub compression_threshold: Option<u32>,
53 pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56pub struct ReadConnection<R: ProtocolPacket> {
58 pub raw: RawReadConnection,
59 _reading: PhantomData<R>,
60}
61
62pub struct WriteConnection<W: ProtocolPacket> {
64 pub raw: RawWriteConnection,
65 _writing: PhantomData<W>,
66}
67
68pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145 pub reader: ReadConnection<R>,
146 pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151 read_raw_packet::<_>(
152 &mut self.read_stream,
153 &mut self.buffer,
154 self.compression_threshold,
155 &mut self.dec_cipher,
156 )
157 .await
158 }
159
160 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161 try_read_raw_packet::<_>(
162 &mut self.read_stream,
163 &mut self.buffer,
164 self.compression_threshold,
165 &mut self.dec_cipher,
166 )
167 }
168}
169
170impl RawWriteConnection {
171 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172 if let Err(e) = write_raw_packet(
173 packet,
174 &mut self.write_stream,
175 self.compression_threshold,
176 &mut self.enc_cipher,
177 )
178 .await
179 {
180 if e.kind() == io::ErrorKind::BrokenPipe {
182 info!("Broken pipe, shutting down connection.");
183 if let Err(e) = self.shutdown().await {
184 error!("Couldn't shut down: {}", e);
185 }
186 }
187 return Err(e);
188 }
189 Ok(())
190 }
191
192 pub async fn shutdown(&mut self) -> io::Result<()> {
194 self.write_stream.shutdown().await
195 }
196}
197
198impl<R> ReadConnection<R>
199where
200 R: ProtocolPacket + Debug,
201{
202 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
204 let raw_packet = self.raw.read().await?;
205 deserialize_packet(&mut Cursor::new(&raw_packet))
206 }
207
208 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
211 let Some(raw_packet) = self.raw.try_read()? else {
212 return Ok(None);
213 };
214 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
215 }
216}
217impl<W> WriteConnection<W>
218where
219 W: ProtocolPacket + Debug,
220{
221 pub async fn write(&mut self, packet: W) -> io::Result<()> {
223 self.raw.write(&serialize_packet(&packet).unwrap()).await
224 }
225
226 pub async fn shutdown(&mut self) -> io::Result<()> {
228 self.raw.shutdown().await
229 }
230}
231
232impl<R, W> Connection<R, W>
233where
234 R: ProtocolPacket + Debug,
235 W: ProtocolPacket + Debug,
236{
237 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
239 self.reader.read().await
240 }
241
242 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
245 self.reader.try_read()
246 }
247
248 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
250 let packet = packet.into_variant();
251 self.writer.write(packet).await
252 }
253
254 #[must_use]
258 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
259 (self.reader, self.writer)
260 }
261
262 #[must_use]
267 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
268 (self.reader.raw, self.writer.raw)
269 }
270}
271
272#[derive(Error, Debug)]
273pub enum ConnectionError {
274 #[error("{0}")]
275 Io(#[from] io::Error),
276}
277
278use socks5_impl::protocol::UserKey;
279
280#[derive(Debug, Clone)]
282pub struct Proxy {
283 pub addr: SocketAddr,
284 pub auth: Option<UserKey>,
285}
286
287impl Proxy {
288 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
289 Self { addr, auth }
290 }
291}
292impl Display for Proxy {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 write!(f, "socks5://")?;
295 if let Some(auth) = &self.auth {
296 write!(f, "{auth}@")?;
297 }
298 write!(f, "{}", self.addr)
299 }
300}
301
302impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
303 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
305 let stream = TcpStream::connect(address).await?;
306
307 stream.set_nodelay(true)?;
309
310 Self::new_from_stream(stream).await
311 }
312
313 pub async fn new_with_proxy(
317 address: &SocketAddr,
318 proxy: Proxy,
319 ) -> Result<Self, ConnectionError> {
320 let proxy_stream = TcpStream::connect(proxy.addr).await?;
321 let mut stream = BufStream::new(proxy_stream);
322
323 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
324 .await
325 .map_err(io::Error::other)?;
326
327 Self::new_from_stream(stream.into_inner()).await
328 }
329
330 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
335 let (read_stream, write_stream) = stream.into_split();
336
337 Ok(Connection {
338 reader: ReadConnection {
339 raw: RawReadConnection {
340 read_stream,
341 buffer: Cursor::new(Vec::new()),
342 compression_threshold: None,
343 dec_cipher: None,
344 },
345 _reading: PhantomData,
346 },
347 writer: WriteConnection {
348 raw: RawWriteConnection {
349 write_stream,
350 compression_threshold: None,
351 enc_cipher: None,
352 },
353 _writing: PhantomData,
354 },
355 })
356 }
357
358 #[must_use]
362 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
363 Connection::from(self)
364 }
365
366 #[must_use]
370 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
371 Connection::from(self)
372 }
373}
374
375impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
376 pub fn set_compression_threshold(&mut self, threshold: i32) {
382 if threshold >= 0 {
384 self.reader.raw.compression_threshold = Some(threshold as u32);
385 self.writer.raw.compression_threshold = Some(threshold as u32);
386 } else {
387 self.reader.raw.compression_threshold = None;
388 self.writer.raw.compression_threshold = None;
389 }
390 }
391
392 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
396 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
397 self.reader.raw.dec_cipher = Some(dec_cipher);
398 self.writer.raw.enc_cipher = Some(enc_cipher);
399 }
400
401 #[must_use]
406 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
407 Connection::from(self)
408 }
409
410 #[cfg(feature = "online-mode")]
461 pub async fn authenticate(
462 &self,
463 access_token: &str,
464 uuid: &Uuid,
465 private_key: [u8; 16],
466 packet: &ClientboundHello,
467 ) -> Result<(), ClientSessionServerError> {
468 azalea_auth::sessionserver::join(
469 access_token,
470 &packet.public_key,
471 &private_key,
472 uuid,
473 &packet.server_id,
474 )
475 .await
476 }
477}
478
479impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
480 #[must_use]
485 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
486 Connection::from(self)
487 }
488
489 #[must_use]
493 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
494 Connection::from(self)
495 }
496}
497
498impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
499 pub fn set_compression_threshold(&mut self, threshold: i32) {
504 if threshold >= 0 {
506 self.reader.raw.compression_threshold = Some(threshold as u32);
507 self.writer.raw.compression_threshold = Some(threshold as u32);
508 } else {
509 self.reader.raw.compression_threshold = None;
510 self.writer.raw.compression_threshold = None;
511 }
512 }
513
514 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
518 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
519 self.reader.raw.dec_cipher = Some(dec_cipher);
520 self.writer.raw.enc_cipher = Some(enc_cipher);
521 }
522
523 #[must_use]
527 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
528 Connection::from(self)
529 }
530
531 #[cfg(feature = "online-mode")]
535 pub async fn authenticate(
536 &self,
537 username: &str,
538 public_key: &[u8],
539 private_key: &[u8; 16],
540 ip: Option<&str>,
541 ) -> Result<GameProfile, ServerSessionServerError> {
542 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
543 }
544
545 #[must_use]
547 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
548 Connection::from(self)
549 }
550}
551
552impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
553 #[must_use]
557 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
558 Connection::from(self)
559 }
560}
561
562impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
563 #[must_use]
567 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
568 Connection::from(self)
569 }
570}
571
572impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
573 #[must_use]
575 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
576 Connection::from(self)
577 }
578}
579impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
580 #[must_use]
582 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
583 Connection::from(self)
584 }
585}
586
587impl<R1, W1> Connection<R1, W1>
590where
591 R1: ProtocolPacket + Debug,
592 W1: ProtocolPacket + Debug,
593{
594 #[must_use]
597 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
598 where
599 R2: ProtocolPacket + Debug,
600 W2: ProtocolPacket + Debug,
601 {
602 Connection {
603 reader: ReadConnection {
604 raw: connection.reader.raw,
605 _reading: PhantomData,
606 },
607 writer: WriteConnection {
608 raw: connection.writer.raw,
609 _writing: PhantomData,
610 },
611 }
612 }
613
614 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
616 let (read_stream, write_stream) = stream.into_split();
617
618 Connection {
619 reader: ReadConnection {
620 raw: RawReadConnection {
621 read_stream,
622 buffer: Cursor::new(Vec::new()),
623 compression_threshold: None,
624 dec_cipher: None,
625 },
626 _reading: PhantomData,
627 },
628 writer: WriteConnection {
629 raw: RawWriteConnection {
630 write_stream,
631 compression_threshold: None,
632 enc_cipher: None,
633 },
634 _writing: PhantomData,
635 },
636 }
637 }
638
639 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
641 self.reader
642 .raw
643 .read_stream
644 .reunite(self.writer.raw.write_stream)
645 }
646}