1use std::{
4 fmt::{self, Debug, Display},
5 io::{self, Cursor},
6 marker::PhantomData,
7 net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12 game_profile::GameProfile,
13 sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18 io::{AsyncWriteExt, BufStream},
19 net::{
20 TcpStream,
21 tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22 },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31 packets::{
32 ProtocolPacket,
33 config::{ClientboundConfigPacket, ServerboundConfigPacket},
34 game::{ClientboundGamePacket, ServerboundGamePacket},
35 handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36 login::{ClientboundLoginPacket, ServerboundLoginPacket},
37 status::{ClientboundStatusPacket, ServerboundStatusPacket},
38 },
39 read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40 write::{serialize_packet, write_raw_packet},
41};
42
43pub struct RawReadConnection {
44 pub read_stream: OwnedReadHalf,
45 pub buffer: Cursor<Vec<u8>>,
46 pub compression_threshold: Option<u32>,
47 pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51 pub write_stream: OwnedWriteHalf,
52 pub compression_threshold: Option<u32>,
53 pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56pub struct ReadConnection<R: ProtocolPacket> {
58 pub raw: RawReadConnection,
59 _reading: PhantomData<R>,
60}
61
62pub struct WriteConnection<W: ProtocolPacket> {
64 pub raw: RawWriteConnection,
65 _writing: PhantomData<W>,
66}
67
68pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145 pub reader: ReadConnection<R>,
146 pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151 read_raw_packet::<_>(
152 &mut self.read_stream,
153 &mut self.buffer,
154 self.compression_threshold,
155 &mut self.dec_cipher,
156 )
157 .await
158 }
159
160 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161 try_read_raw_packet::<_>(
162 &mut self.read_stream,
163 &mut self.buffer,
164 self.compression_threshold,
165 &mut self.dec_cipher,
166 )
167 }
168}
169
170impl RawWriteConnection {
171 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172 if let Err(e) = write_raw_packet(
173 packet,
174 &mut self.write_stream,
175 self.compression_threshold,
176 &mut self.enc_cipher,
177 )
178 .await
179 {
180 if e.kind() == io::ErrorKind::BrokenPipe {
182 info!("Broken pipe, shutting down connection.");
183 if let Err(e) = self.shutdown().await {
184 error!("Couldn't shut down: {}", e);
185 }
186 }
187 return Err(e);
188 }
189 Ok(())
190 }
191
192 pub async fn shutdown(&mut self) -> io::Result<()> {
194 self.write_stream.shutdown().await
195 }
196}
197
198impl<R> ReadConnection<R>
199where
200 R: ProtocolPacket + Debug,
201{
202 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
204 let raw_packet = self.raw.read().await?;
205 deserialize_packet(&mut Cursor::new(&raw_packet))
206 }
207
208 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
211 let Some(raw_packet) = self.raw.try_read()? else {
212 return Ok(None);
213 };
214 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
215 }
216}
217impl<W> WriteConnection<W>
218where
219 W: ProtocolPacket + Debug,
220{
221 pub async fn write(&mut self, packet: W) -> io::Result<()> {
223 self.raw.write(&serialize_packet(&packet).unwrap()).await
224 }
225
226 pub async fn shutdown(&mut self) -> io::Result<()> {
228 self.raw.shutdown().await
229 }
230}
231
232impl<R, W> Connection<R, W>
233where
234 R: ProtocolPacket + Debug,
235 W: ProtocolPacket + Debug,
236{
237 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
239 self.reader.read().await
240 }
241
242 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
245 self.reader.try_read()
246 }
247
248 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
250 let packet = packet.into_variant();
251 self.writer.write(packet).await
252 }
253
254 #[must_use]
258 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
259 (self.reader, self.writer)
260 }
261
262 #[must_use]
267 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
268 (self.reader.raw, self.writer.raw)
269 }
270}
271
272#[derive(Debug, Error)]
273pub enum ConnectionError {
274 #[error("{0}")]
275 Io(#[from] io::Error),
276}
277
278use socks5_impl::protocol::UserKey;
279
280#[derive(Clone, Debug)]
282pub struct Proxy {
283 pub addr: SocketAddr,
284 pub auth: Option<UserKey>,
285}
286
287impl Proxy {
288 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
289 Self { addr, auth }
290 }
291}
292impl Display for Proxy {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 write!(f, "socks5://")?;
295 if let Some(auth) = &self.auth {
296 write!(f, "{auth}@")?;
297 }
298 write!(f, "{}", self.addr)
299 }
300}
301
302#[cfg(feature = "online-mode")]
303impl From<Proxy> for reqwest::Proxy {
304 fn from(proxy: Proxy) -> Self {
305 reqwest::Proxy::all(proxy.to_string())
306 .expect("azalea proxies should not fail to parse as reqwest proxies")
307 }
308}
309
310impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
311 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
313 let stream = TcpStream::connect(address).await?;
314
315 stream.set_nodelay(true)?;
317
318 Self::new_from_stream(stream).await
319 }
320
321 pub async fn new_with_proxy(
325 address: &SocketAddr,
326 proxy: Proxy,
327 ) -> Result<Self, ConnectionError> {
328 let proxy_stream = TcpStream::connect(proxy.addr).await?;
329 let mut stream = BufStream::new(proxy_stream);
330
331 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
332 .await
333 .map_err(io::Error::other)?;
334
335 Self::new_from_stream(stream.into_inner()).await
336 }
337
338 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
343 let (read_stream, write_stream) = stream.into_split();
344
345 Ok(Connection {
346 reader: ReadConnection {
347 raw: RawReadConnection {
348 read_stream,
349 buffer: Cursor::new(Vec::new()),
350 compression_threshold: None,
351 dec_cipher: None,
352 },
353 _reading: PhantomData,
354 },
355 writer: WriteConnection {
356 raw: RawWriteConnection {
357 write_stream,
358 compression_threshold: None,
359 enc_cipher: None,
360 },
361 _writing: PhantomData,
362 },
363 })
364 }
365
366 #[must_use]
370 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
371 Connection::from(self)
372 }
373
374 #[must_use]
378 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
379 Connection::from(self)
380 }
381}
382
383impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
384 pub fn set_compression_threshold(&mut self, threshold: i32) {
390 if threshold >= 0 {
392 self.reader.raw.compression_threshold = Some(threshold as u32);
393 self.writer.raw.compression_threshold = Some(threshold as u32);
394 } else {
395 self.reader.raw.compression_threshold = None;
396 self.writer.raw.compression_threshold = None;
397 }
398 }
399
400 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
404 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
405 self.reader.raw.dec_cipher = Some(dec_cipher);
406 self.writer.raw.enc_cipher = Some(enc_cipher);
407 }
408
409 #[must_use]
414 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
415 Connection::from(self)
416 }
417
418 #[cfg(feature = "online-mode")]
469 pub async fn authenticate(
470 &self,
471 access_token: &str,
472 uuid: &Uuid,
473 private_key: [u8; 16],
474 packet: &ClientboundHello,
475 sessionserver_proxy: Option<Proxy>,
476 ) -> Result<(), ClientSessionServerError> {
477 use azalea_auth::sessionserver::{self, SessionServerJoinOpts};
478
479 sessionserver::join(SessionServerJoinOpts {
480 access_token,
481 public_key: &packet.public_key,
482 private_key: &private_key,
483 uuid,
484 server_id: &packet.server_id,
485 proxy: sessionserver_proxy.map(Proxy::into),
486 })
487 .await
488 }
489}
490
491impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
492 #[must_use]
497 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
498 Connection::from(self)
499 }
500
501 #[must_use]
505 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
506 Connection::from(self)
507 }
508}
509
510impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
511 pub fn set_compression_threshold(&mut self, threshold: i32) {
516 if threshold >= 0 {
518 self.reader.raw.compression_threshold = Some(threshold as u32);
519 self.writer.raw.compression_threshold = Some(threshold as u32);
520 } else {
521 self.reader.raw.compression_threshold = None;
522 self.writer.raw.compression_threshold = None;
523 }
524 }
525
526 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
530 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
531 self.reader.raw.dec_cipher = Some(dec_cipher);
532 self.writer.raw.enc_cipher = Some(enc_cipher);
533 }
534
535 #[must_use]
539 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
540 Connection::from(self)
541 }
542
543 #[cfg(feature = "online-mode")]
547 pub async fn authenticate(
548 &self,
549 username: &str,
550 public_key: &[u8],
551 private_key: &[u8; 16],
552 ip: Option<&str>,
553 ) -> Result<GameProfile, ServerSessionServerError> {
554 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
555 }
556
557 #[must_use]
559 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
560 Connection::from(self)
561 }
562}
563
564impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
565 #[must_use]
569 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
570 Connection::from(self)
571 }
572}
573
574impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
575 #[must_use]
579 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
580 Connection::from(self)
581 }
582}
583
584impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
585 #[must_use]
587 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
588 Connection::from(self)
589 }
590}
591impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
592 #[must_use]
594 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
595 Connection::from(self)
596 }
597}
598
599impl<R1, W1> Connection<R1, W1>
602where
603 R1: ProtocolPacket + Debug,
604 W1: ProtocolPacket + Debug,
605{
606 #[must_use]
609 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
610 where
611 R2: ProtocolPacket + Debug,
612 W2: ProtocolPacket + Debug,
613 {
614 Connection {
615 reader: ReadConnection {
616 raw: connection.reader.raw,
617 _reading: PhantomData,
618 },
619 writer: WriteConnection {
620 raw: connection.writer.raw,
621 _writing: PhantomData,
622 },
623 }
624 }
625
626 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
628 let (read_stream, write_stream) = stream.into_split();
629
630 Connection {
631 reader: ReadConnection {
632 raw: RawReadConnection {
633 read_stream,
634 buffer: Cursor::new(Vec::new()),
635 compression_threshold: None,
636 dec_cipher: None,
637 },
638 _reading: PhantomData,
639 },
640 writer: WriteConnection {
641 raw: RawWriteConnection {
642 write_stream,
643 compression_threshold: None,
644 enc_cipher: None,
645 },
646 _writing: PhantomData,
647 },
648 }
649 }
650
651 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
653 self.reader
654 .raw
655 .read_stream
656 .reunite(self.writer.raw.write_stream)
657 }
658}