1use std::{
4 fmt::{self, Debug, Display},
5 io::{self, Cursor},
6 marker::PhantomData,
7 net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12 game_profile::GameProfile,
13 sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18 io::{AsyncWriteExt, BufStream},
19 net::{
20 TcpStream,
21 tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22 },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31 packets::{
32 ProtocolPacket,
33 config::{ClientboundConfigPacket, ServerboundConfigPacket},
34 game::{ClientboundGamePacket, ServerboundGamePacket},
35 handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36 login::{ClientboundLoginPacket, ServerboundLoginPacket},
37 status::{ClientboundStatusPacket, ServerboundStatusPacket},
38 },
39 read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40 write::{serialize_packet, write_raw_packet},
41};
42
43pub struct RawReadConnection {
44 pub read_stream: OwnedReadHalf,
45 pub buffer: Cursor<Vec<u8>>,
46 pub compression_threshold: Option<u32>,
47 pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51 pub write_stream: OwnedWriteHalf,
52 pub compression_threshold: Option<u32>,
53 pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56pub struct ReadConnection<R: ProtocolPacket> {
58 pub raw: RawReadConnection,
59 _reading: PhantomData<R>,
60}
61
62pub struct WriteConnection<W: ProtocolPacket> {
64 pub raw: RawWriteConnection,
65 _writing: PhantomData<W>,
66}
67
68pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145 pub reader: ReadConnection<R>,
146 pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151 read_raw_packet::<_>(
152 &mut self.read_stream,
153 &mut self.buffer,
154 self.compression_threshold,
155 &mut self.dec_cipher,
156 )
157 .await
158 }
159
160 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161 try_read_raw_packet::<_>(
162 &mut self.read_stream,
163 &mut self.buffer,
164 self.compression_threshold,
165 &mut self.dec_cipher,
166 )
167 }
168}
169
170impl RawWriteConnection {
171 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172 if let Err(e) = write_raw_packet(
173 packet,
174 &mut self.write_stream,
175 self.compression_threshold,
176 &mut self.enc_cipher,
177 )
178 .await
179 {
180 if e.kind() == io::ErrorKind::BrokenPipe {
182 info!("Broken pipe, shutting down connection.");
183 if let Err(e) = self.shutdown().await {
184 error!("Couldn't shut down: {}", e);
185 }
186 }
187 return Err(e);
188 }
189 Ok(())
190 }
191
192 pub async fn shutdown(&mut self) -> io::Result<()> {
194 self.write_stream.shutdown().await
195 }
196}
197
198impl<R> ReadConnection<R>
199where
200 R: ProtocolPacket + Debug,
201{
202 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
204 let raw_packet = self.raw.read().await?;
205 deserialize_packet(&mut Cursor::new(&raw_packet))
206 }
207
208 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
211 let Some(raw_packet) = self.raw.try_read()? else {
212 return Ok(None);
213 };
214 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
215 }
216}
217impl<W> WriteConnection<W>
218where
219 W: ProtocolPacket + Debug,
220{
221 pub async fn write(&mut self, packet: W) -> io::Result<()> {
223 self.raw.write(&serialize_packet(&packet).unwrap()).await
224 }
225
226 pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
228 let serialized_packets: Vec<u8> = packets
229 .into_iter()
230 .flat_map(|packet| serialize_packet(packet).unwrap())
231 .collect();
232 self.raw.write(&serialized_packets).await
233 }
234
235 pub async fn shutdown(&mut self) -> io::Result<()> {
237 self.raw.shutdown().await
238 }
239}
240
241impl<R, W> Connection<R, W>
242where
243 R: ProtocolPacket + Debug,
244 W: ProtocolPacket + Debug,
245{
246 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
248 self.reader.read().await
249 }
250
251 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
254 self.reader.try_read()
255 }
256
257 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
259 let packet = packet.into_variant();
260 self.writer.write(packet).await
261 }
262
263 pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
265 self.writer.write_batch(packets).await
266 }
267
268 #[must_use]
272 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
273 (self.reader, self.writer)
274 }
275
276 #[must_use]
281 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
282 (self.reader.raw, self.writer.raw)
283 }
284}
285
286#[derive(Debug, Error)]
287pub enum ConnectionError {
288 #[error("{0}")]
289 Io(#[from] io::Error),
290}
291
292use socks5_impl::protocol::UserKey;
293
294#[derive(Clone, Debug)]
296pub struct Proxy {
297 pub addr: SocketAddr,
298 pub auth: Option<UserKey>,
299}
300
301impl Proxy {
302 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
303 Self { addr, auth }
304 }
305}
306impl Display for Proxy {
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308 write!(f, "socks5://")?;
309 if let Some(auth) = &self.auth {
310 write!(f, "{auth}@")?;
311 }
312 write!(f, "{}", self.addr)
313 }
314}
315
316#[cfg(feature = "online-mode")]
317impl From<Proxy> for reqwest::Proxy {
318 fn from(proxy: Proxy) -> Self {
319 reqwest::Proxy::all(proxy.to_string())
320 .expect("azalea proxies should not fail to parse as reqwest proxies")
321 }
322}
323
324impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
325 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
327 let stream = TcpStream::connect(address).await?;
328
329 stream.set_nodelay(true)?;
331
332 Self::new_from_stream(stream).await
333 }
334
335 pub async fn new_with_proxy(
339 address: &SocketAddr,
340 proxy: Proxy,
341 ) -> Result<Self, ConnectionError> {
342 let proxy_stream = TcpStream::connect(proxy.addr).await?;
343 let mut stream = BufStream::new(proxy_stream);
344
345 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
346 .await
347 .map_err(io::Error::other)?;
348
349 Self::new_from_stream(stream.into_inner()).await
350 }
351
352 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
357 let (read_stream, write_stream) = stream.into_split();
358
359 Ok(Connection {
360 reader: ReadConnection {
361 raw: RawReadConnection {
362 read_stream,
363 buffer: Cursor::new(Vec::new()),
364 compression_threshold: None,
365 dec_cipher: None,
366 },
367 _reading: PhantomData,
368 },
369 writer: WriteConnection {
370 raw: RawWriteConnection {
371 write_stream,
372 compression_threshold: None,
373 enc_cipher: None,
374 },
375 _writing: PhantomData,
376 },
377 })
378 }
379
380 #[must_use]
384 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
385 Connection::from(self)
386 }
387
388 #[must_use]
392 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
393 Connection::from(self)
394 }
395}
396
397impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
398 pub fn set_compression_threshold(&mut self, threshold: i32) {
404 if threshold >= 0 {
406 self.reader.raw.compression_threshold = Some(threshold as u32);
407 self.writer.raw.compression_threshold = Some(threshold as u32);
408 } else {
409 self.reader.raw.compression_threshold = None;
410 self.writer.raw.compression_threshold = None;
411 }
412 }
413
414 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
418 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
419 self.reader.raw.dec_cipher = Some(dec_cipher);
420 self.writer.raw.enc_cipher = Some(enc_cipher);
421 }
422
423 #[must_use]
428 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
429 Connection::from(self)
430 }
431
432 #[cfg(feature = "online-mode")]
483 pub async fn authenticate(
484 &self,
485 access_token: &str,
486 uuid: &Uuid,
487 private_key: [u8; 16],
488 packet: &ClientboundHello,
489 sessionserver_proxy: Option<Proxy>,
490 ) -> Result<(), ClientSessionServerError> {
491 use azalea_auth::sessionserver::{self, SessionServerJoinOpts};
492
493 sessionserver::join(SessionServerJoinOpts {
494 access_token,
495 public_key: &packet.public_key,
496 private_key: &private_key,
497 uuid,
498 server_id: &packet.server_id,
499 proxy: sessionserver_proxy.map(Proxy::into),
500 })
501 .await
502 }
503}
504
505impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
506 #[must_use]
511 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
512 Connection::from(self)
513 }
514
515 #[must_use]
519 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
520 Connection::from(self)
521 }
522}
523
524impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
525 pub fn set_compression_threshold(&mut self, threshold: i32) {
530 if threshold >= 0 {
532 self.reader.raw.compression_threshold = Some(threshold as u32);
533 self.writer.raw.compression_threshold = Some(threshold as u32);
534 } else {
535 self.reader.raw.compression_threshold = None;
536 self.writer.raw.compression_threshold = None;
537 }
538 }
539
540 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
544 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
545 self.reader.raw.dec_cipher = Some(dec_cipher);
546 self.writer.raw.enc_cipher = Some(enc_cipher);
547 }
548
549 #[must_use]
553 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
554 Connection::from(self)
555 }
556
557 #[cfg(feature = "online-mode")]
561 pub async fn authenticate(
562 &self,
563 username: &str,
564 public_key: &[u8],
565 private_key: &[u8; 16],
566 ip: Option<&str>,
567 ) -> Result<GameProfile, ServerSessionServerError> {
568 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
569 }
570
571 #[must_use]
573 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
574 Connection::from(self)
575 }
576}
577
578impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
579 #[must_use]
583 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
584 Connection::from(self)
585 }
586}
587
588impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
589 #[must_use]
593 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
594 Connection::from(self)
595 }
596}
597
598impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
599 #[must_use]
601 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
602 Connection::from(self)
603 }
604}
605impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
606 #[must_use]
608 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
609 Connection::from(self)
610 }
611}
612
613impl<R1, W1> Connection<R1, W1>
616where
617 R1: ProtocolPacket + Debug,
618 W1: ProtocolPacket + Debug,
619{
620 #[must_use]
623 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
624 where
625 R2: ProtocolPacket + Debug,
626 W2: ProtocolPacket + Debug,
627 {
628 Connection {
629 reader: ReadConnection {
630 raw: connection.reader.raw,
631 _reading: PhantomData,
632 },
633 writer: WriteConnection {
634 raw: connection.writer.raw,
635 _writing: PhantomData,
636 },
637 }
638 }
639
640 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
642 let (read_stream, write_stream) = stream.into_split();
643
644 Connection {
645 reader: ReadConnection {
646 raw: RawReadConnection {
647 read_stream,
648 buffer: Cursor::new(Vec::new()),
649 compression_threshold: None,
650 dec_cipher: None,
651 },
652 _reading: PhantomData,
653 },
654 writer: WriteConnection {
655 raw: RawWriteConnection {
656 write_stream,
657 compression_threshold: None,
658 enc_cipher: None,
659 },
660 _writing: PhantomData,
661 },
662 }
663 }
664
665 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
667 self.reader
668 .raw
669 .read_stream
670 .reunite(self.writer.raw.write_stream)
671 }
672}