1use std::{
4 fmt::{self, Debug, Display},
5 io::{self, Cursor},
6 marker::PhantomData,
7 net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12 game_profile::GameProfile,
13 sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18 io::{AsyncWriteExt, BufStream},
19 net::{
20 TcpStream,
21 tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22 },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31 packets::{
32 ProtocolPacket,
33 config::{ClientboundConfigPacket, ServerboundConfigPacket},
34 game::{ClientboundGamePacket, ServerboundGamePacket},
35 handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36 login::{ClientboundLoginPacket, ServerboundLoginPacket},
37 status::{ClientboundStatusPacket, ServerboundStatusPacket},
38 },
39 read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40 write::{serialize_packet, write_raw_packet, write_raw_packets},
41};
42
43pub struct RawReadConnection {
44 pub read_stream: OwnedReadHalf,
45 pub buffer: Cursor<Vec<u8>>,
46 pub compression_threshold: Option<u32>,
47 pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51 pub write_stream: OwnedWriteHalf,
52 pub compression_threshold: Option<u32>,
53 pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56pub struct ReadConnection<R: ProtocolPacket> {
58 pub raw: RawReadConnection,
59 _reading: PhantomData<R>,
60}
61
62pub struct WriteConnection<W: ProtocolPacket> {
64 pub raw: RawWriteConnection,
65 _writing: PhantomData<W>,
66}
67
68pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145 pub reader: ReadConnection<R>,
146 pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151 read_raw_packet::<_>(
152 &mut self.read_stream,
153 &mut self.buffer,
154 self.compression_threshold,
155 &mut self.dec_cipher,
156 )
157 .await
158 }
159
160 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161 try_read_raw_packet::<_>(
162 &mut self.read_stream,
163 &mut self.buffer,
164 self.compression_threshold,
165 &mut self.dec_cipher,
166 )
167 }
168}
169
170impl RawWriteConnection {
171 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172 if let Err(e) = write_raw_packet(
173 packet,
174 &mut self.write_stream,
175 self.compression_threshold,
176 &mut self.enc_cipher,
177 )
178 .await
179 {
180 if e.kind() == io::ErrorKind::BrokenPipe {
182 info!("Broken pipe, shutting down connection.");
183 if let Err(e) = self.shutdown().await {
184 error!("Couldn't shut down: {}", e);
185 }
186 }
187 return Err(e);
188 }
189 Ok(())
190 }
191
192 pub async fn write_batch(&mut self, packets: impl Iterator<Item = &[u8]>) -> io::Result<()> {
193 if let Err(e) = write_raw_packets(
194 packets,
195 &mut self.write_stream,
196 self.compression_threshold,
197 &mut self.enc_cipher,
198 )
199 .await
200 {
201 if e.kind() == io::ErrorKind::BrokenPipe {
203 info!("Broken pipe, shutting down connection.");
204 if let Err(e) = self.shutdown().await {
205 error!("Couldn't shut down: {}", e);
206 }
207 }
208 return Err(e);
209 }
210 Ok(())
211 }
212
213 pub async fn shutdown(&mut self) -> io::Result<()> {
215 self.write_stream.shutdown().await
216 }
217}
218
219impl<R> ReadConnection<R>
220where
221 R: ProtocolPacket + Debug,
222{
223 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
225 let raw_packet = self.raw.read().await?;
226 deserialize_packet(&mut Cursor::new(&raw_packet))
227 }
228
229 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232 let Some(raw_packet) = self.raw.try_read()? else {
233 return Ok(None);
234 };
235 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
236 }
237}
238impl<W> WriteConnection<W>
239where
240 W: ProtocolPacket + Debug,
241{
242 pub async fn write(&mut self, packet: W) -> io::Result<()> {
244 self.raw.write(&serialize_packet(&packet).unwrap()).await
245 }
246
247 pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
249 let serialized_packets: Vec<Box<[u8]>> = packets
250 .iter()
251 .map(|packet| serialize_packet(packet).unwrap())
252 .collect();
253 self.raw
254 .write_batch(serialized_packets.iter().map(|data| data.as_ref()))
255 .await
256 }
257
258 pub async fn shutdown(&mut self) -> io::Result<()> {
260 self.raw.shutdown().await
261 }
262}
263
264impl<R, W> Connection<R, W>
265where
266 R: ProtocolPacket + Debug,
267 W: ProtocolPacket + Debug,
268{
269 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
271 self.reader.read().await
272 }
273
274 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
277 self.reader.try_read()
278 }
279
280 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
282 let packet = packet.into_variant();
283 self.writer.write(packet).await
284 }
285
286 pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
288 self.writer.write_batch(packets).await
289 }
290
291 #[must_use]
295 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
296 (self.reader, self.writer)
297 }
298
299 #[must_use]
304 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
305 (self.reader.raw, self.writer.raw)
306 }
307}
308
309#[derive(Debug, Error)]
310pub enum ConnectionError {
311 #[error("{0}")]
312 Io(#[from] io::Error),
313}
314
315use socks5_impl::protocol::UserKey;
316
317#[derive(Clone, Debug)]
319pub struct Proxy {
320 pub addr: SocketAddr,
321 pub auth: Option<UserKey>,
322}
323
324impl Proxy {
325 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
326 Self { addr, auth }
327 }
328}
329impl Display for Proxy {
330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331 write!(f, "socks5://")?;
332 if let Some(auth) = &self.auth {
333 write!(f, "{auth}@")?;
334 }
335 write!(f, "{}", self.addr)
336 }
337}
338
339#[cfg(feature = "online-mode")]
340impl From<Proxy> for reqwest::Proxy {
341 fn from(proxy: Proxy) -> Self {
342 reqwest::Proxy::all(proxy.to_string())
343 .expect("azalea proxies should not fail to parse as reqwest proxies")
344 }
345}
346
347impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
348 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
350 let stream = TcpStream::connect(address).await?;
351
352 stream.set_nodelay(true)?;
354
355 Self::new_from_stream(stream).await
356 }
357
358 pub async fn new_with_proxy(
362 address: &SocketAddr,
363 proxy: Proxy,
364 ) -> Result<Self, ConnectionError> {
365 let proxy_stream = TcpStream::connect(proxy.addr).await?;
366 let mut stream = BufStream::new(proxy_stream);
367
368 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
369 .await
370 .map_err(io::Error::other)?;
371
372 Self::new_from_stream(stream.into_inner()).await
373 }
374
375 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
380 let (read_stream, write_stream) = stream.into_split();
381
382 Ok(Connection {
383 reader: ReadConnection {
384 raw: RawReadConnection {
385 read_stream,
386 buffer: Cursor::new(Vec::new()),
387 compression_threshold: None,
388 dec_cipher: None,
389 },
390 _reading: PhantomData,
391 },
392 writer: WriteConnection {
393 raw: RawWriteConnection {
394 write_stream,
395 compression_threshold: None,
396 enc_cipher: None,
397 },
398 _writing: PhantomData,
399 },
400 })
401 }
402
403 #[must_use]
407 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
408 Connection::from(self)
409 }
410
411 #[must_use]
415 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
416 Connection::from(self)
417 }
418}
419
420impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
421 pub fn set_compression_threshold(&mut self, threshold: i32) {
427 if threshold >= 0 {
429 self.reader.raw.compression_threshold = Some(threshold as u32);
430 self.writer.raw.compression_threshold = Some(threshold as u32);
431 } else {
432 self.reader.raw.compression_threshold = None;
433 self.writer.raw.compression_threshold = None;
434 }
435 }
436
437 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
441 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
442 self.reader.raw.dec_cipher = Some(dec_cipher);
443 self.writer.raw.enc_cipher = Some(enc_cipher);
444 }
445
446 #[must_use]
451 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
452 Connection::from(self)
453 }
454
455 #[cfg(feature = "online-mode")]
506 pub async fn authenticate(
507 &self,
508 access_token: &str,
509 uuid: &Uuid,
510 private_key: [u8; 16],
511 packet: &ClientboundHello,
512 sessionserver_proxy: Option<Proxy>,
513 ) -> Result<(), ClientSessionServerError> {
514 use azalea_auth::sessionserver::{self, SessionServerJoinOpts};
515
516 sessionserver::join(SessionServerJoinOpts {
517 access_token,
518 public_key: &packet.public_key,
519 private_key: &private_key,
520 uuid,
521 server_id: &packet.server_id,
522 proxy: sessionserver_proxy.map(Proxy::into),
523 })
524 .await
525 }
526}
527
528impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
529 #[must_use]
534 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
535 Connection::from(self)
536 }
537
538 #[must_use]
542 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
543 Connection::from(self)
544 }
545}
546
547impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
548 pub fn set_compression_threshold(&mut self, threshold: i32) {
553 if threshold >= 0 {
555 self.reader.raw.compression_threshold = Some(threshold as u32);
556 self.writer.raw.compression_threshold = Some(threshold as u32);
557 } else {
558 self.reader.raw.compression_threshold = None;
559 self.writer.raw.compression_threshold = None;
560 }
561 }
562
563 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
567 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
568 self.reader.raw.dec_cipher = Some(dec_cipher);
569 self.writer.raw.enc_cipher = Some(enc_cipher);
570 }
571
572 #[must_use]
576 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
577 Connection::from(self)
578 }
579
580 #[cfg(feature = "online-mode")]
584 pub async fn authenticate(
585 &self,
586 username: &str,
587 public_key: &[u8],
588 private_key: &[u8; 16],
589 ip: Option<&str>,
590 ) -> Result<GameProfile, ServerSessionServerError> {
591 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
592 }
593
594 #[must_use]
596 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
597 Connection::from(self)
598 }
599}
600
601impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
602 #[must_use]
606 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
607 Connection::from(self)
608 }
609}
610
611impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
612 #[must_use]
616 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
617 Connection::from(self)
618 }
619}
620
621impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
622 #[must_use]
624 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
625 Connection::from(self)
626 }
627}
628impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
629 #[must_use]
631 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
632 Connection::from(self)
633 }
634}
635
636impl<R1, W1> Connection<R1, W1>
639where
640 R1: ProtocolPacket + Debug,
641 W1: ProtocolPacket + Debug,
642{
643 #[must_use]
646 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
647 where
648 R2: ProtocolPacket + Debug,
649 W2: ProtocolPacket + Debug,
650 {
651 Connection {
652 reader: ReadConnection {
653 raw: connection.reader.raw,
654 _reading: PhantomData,
655 },
656 writer: WriteConnection {
657 raw: connection.writer.raw,
658 _writing: PhantomData,
659 },
660 }
661 }
662
663 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
665 let (read_stream, write_stream) = stream.into_split();
666
667 Connection {
668 reader: ReadConnection {
669 raw: RawReadConnection {
670 read_stream,
671 buffer: Cursor::new(Vec::new()),
672 compression_threshold: None,
673 dec_cipher: None,
674 },
675 _reading: PhantomData,
676 },
677 writer: WriteConnection {
678 raw: RawWriteConnection {
679 write_stream,
680 compression_threshold: None,
681 enc_cipher: None,
682 },
683 _writing: PhantomData,
684 },
685 }
686 }
687
688 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
690 self.reader
691 .raw
692 .read_stream
693 .reunite(self.writer.raw.write_stream)
694 }
695}