1use std::{
4 fmt::{self, Debug, Display},
5 io::{self, Cursor},
6 marker::PhantomData,
7 net::SocketAddr,
8};
9
10use azalea_auth::{
11 game_profile::GameProfile,
12 sessionserver::{ClientSessionServerError, ServerSessionServerError},
13};
14use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
15use thiserror::Error;
16use tokio::{
17 io::{AsyncWriteExt, BufStream},
18 net::{
19 TcpStream,
20 tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
21 },
22};
23use tracing::{error, info};
24use uuid::Uuid;
25
26use crate::{
27 packets::{
28 ProtocolPacket,
29 config::{ClientboundConfigPacket, ServerboundConfigPacket},
30 game::{ClientboundGamePacket, ServerboundGamePacket},
31 handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
32 login::{ClientboundLoginPacket, ServerboundLoginPacket, c_hello::ClientboundHello},
33 status::{ClientboundStatusPacket, ServerboundStatusPacket},
34 },
35 read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
36 write::{serialize_packet, write_raw_packet},
37};
38
39pub struct RawReadConnection {
40 pub read_stream: OwnedReadHalf,
41 pub buffer: Cursor<Vec<u8>>,
42 pub compression_threshold: Option<u32>,
43 pub dec_cipher: Option<Aes128CfbDec>,
44}
45
46pub struct RawWriteConnection {
47 pub write_stream: OwnedWriteHalf,
48 pub compression_threshold: Option<u32>,
49 pub enc_cipher: Option<Aes128CfbEnc>,
50}
51
52pub struct ReadConnection<R: ProtocolPacket> {
54 pub raw: RawReadConnection,
55 _reading: PhantomData<R>,
56}
57
58pub struct WriteConnection<W: ProtocolPacket> {
60 pub raw: RawWriteConnection,
61 _writing: PhantomData<W>,
62}
63
64pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
141 pub reader: ReadConnection<R>,
142 pub writer: WriteConnection<W>,
143}
144
145impl RawReadConnection {
146 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
147 read_raw_packet::<_>(
148 &mut self.read_stream,
149 &mut self.buffer,
150 self.compression_threshold,
151 &mut self.dec_cipher,
152 )
153 .await
154 }
155
156 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
157 try_read_raw_packet::<_>(
158 &mut self.read_stream,
159 &mut self.buffer,
160 self.compression_threshold,
161 &mut self.dec_cipher,
162 )
163 }
164}
165
166impl RawWriteConnection {
167 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
168 if let Err(e) = write_raw_packet(
169 packet,
170 &mut self.write_stream,
171 self.compression_threshold,
172 &mut self.enc_cipher,
173 )
174 .await
175 {
176 if e.kind() == io::ErrorKind::BrokenPipe {
178 info!("Broken pipe, shutting down connection.");
179 if let Err(e) = self.shutdown().await {
180 error!("Couldn't shut down: {}", e);
181 }
182 }
183 return Err(e);
184 }
185 Ok(())
186 }
187
188 pub async fn shutdown(&mut self) -> io::Result<()> {
190 self.write_stream.shutdown().await
191 }
192}
193
194impl<R> ReadConnection<R>
195where
196 R: ProtocolPacket + Debug,
197{
198 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
200 let raw_packet = self.raw.read().await?;
201 deserialize_packet(&mut Cursor::new(&raw_packet))
202 }
203
204 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
207 let Some(raw_packet) = self.raw.try_read()? else {
208 return Ok(None);
209 };
210 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
211 }
212}
213impl<W> WriteConnection<W>
214where
215 W: ProtocolPacket + Debug,
216{
217 pub async fn write(&mut self, packet: W) -> io::Result<()> {
219 self.raw.write(&serialize_packet(&packet).unwrap()).await
220 }
221
222 pub async fn shutdown(&mut self) -> io::Result<()> {
224 self.raw.shutdown().await
225 }
226}
227
228impl<R, W> Connection<R, W>
229where
230 R: ProtocolPacket + Debug,
231 W: ProtocolPacket + Debug,
232{
233 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
235 self.reader.read().await
236 }
237
238 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
241 self.reader.try_read()
242 }
243
244 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
246 let packet = packet.into_variant();
247 self.writer.write(packet).await
248 }
249
250 #[must_use]
252 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
253 (self.reader, self.writer)
254 }
255
256 #[must_use]
261 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
262 (self.reader.raw, self.writer.raw)
263 }
264}
265
266#[derive(Error, Debug)]
267pub enum ConnectionError {
268 #[error("{0}")]
269 Io(#[from] io::Error),
270}
271
272use socks5_impl::protocol::UserKey;
273
274#[derive(Debug, Clone)]
276pub struct Proxy {
277 pub addr: SocketAddr,
278 pub auth: Option<UserKey>,
279}
280
281impl Proxy {
282 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
283 Self { addr, auth }
284 }
285}
286impl Display for Proxy {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 write!(f, "socks5://")?;
289 if let Some(auth) = &self.auth {
290 write!(f, "{auth}@")?;
291 }
292 write!(f, "{}", self.addr)
293 }
294}
295
296impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
297 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
299 let stream = TcpStream::connect(address).await?;
300
301 stream.set_nodelay(true)?;
303
304 Self::new_from_stream(stream).await
305 }
306
307 pub async fn new_with_proxy(
310 address: &SocketAddr,
311 proxy: Proxy,
312 ) -> Result<Self, ConnectionError> {
313 let proxy_stream = TcpStream::connect(proxy.addr).await?;
314 let mut stream = BufStream::new(proxy_stream);
315
316 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
317 .await
318 .map_err(io::Error::other)?;
319
320 Self::new_from_stream(stream.into_inner()).await
321 }
322
323 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
326 let (read_stream, write_stream) = stream.into_split();
327
328 Ok(Connection {
329 reader: ReadConnection {
330 raw: RawReadConnection {
331 read_stream,
332 buffer: Cursor::new(Vec::new()),
333 compression_threshold: None,
334 dec_cipher: None,
335 },
336 _reading: PhantomData,
337 },
338 writer: WriteConnection {
339 raw: RawWriteConnection {
340 write_stream,
341 compression_threshold: None,
342 enc_cipher: None,
343 },
344 _writing: PhantomData,
345 },
346 })
347 }
348
349 #[must_use]
352 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
353 Connection::from(self)
354 }
355
356 #[must_use]
359 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
360 Connection::from(self)
361 }
362}
363
364impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
365 pub fn set_compression_threshold(&mut self, threshold: i32) {
370 if threshold >= 0 {
372 self.reader.raw.compression_threshold = Some(threshold as u32);
373 self.writer.raw.compression_threshold = Some(threshold as u32);
374 } else {
375 self.reader.raw.compression_threshold = None;
376 self.writer.raw.compression_threshold = None;
377 }
378 }
379
380 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
383 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
384 self.reader.raw.dec_cipher = Some(dec_cipher);
385 self.writer.raw.enc_cipher = Some(enc_cipher);
386 }
387
388 #[must_use]
391 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
392 Connection::from(self)
393 }
394
395 pub async fn authenticate(
445 &self,
446 access_token: &str,
447 uuid: &Uuid,
448 private_key: [u8; 16],
449 packet: &ClientboundHello,
450 ) -> Result<(), ClientSessionServerError> {
451 azalea_auth::sessionserver::join(
452 access_token,
453 &packet.public_key,
454 &private_key,
455 uuid,
456 &packet.server_id,
457 )
458 .await
459 }
460}
461
462impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
463 #[must_use]
466 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
467 Connection::from(self)
468 }
469
470 #[must_use]
473 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
474 Connection::from(self)
475 }
476}
477
478impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
479 pub fn set_compression_threshold(&mut self, threshold: i32) {
483 if threshold >= 0 {
485 self.reader.raw.compression_threshold = Some(threshold as u32);
486 self.writer.raw.compression_threshold = Some(threshold as u32);
487 } else {
488 self.reader.raw.compression_threshold = None;
489 self.writer.raw.compression_threshold = None;
490 }
491 }
492
493 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
496 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
497 self.reader.raw.dec_cipher = Some(dec_cipher);
498 self.writer.raw.enc_cipher = Some(enc_cipher);
499 }
500
501 #[must_use]
504 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
505 Connection::from(self)
506 }
507
508 pub async fn authenticate(
512 &self,
513 username: &str,
514 public_key: &[u8],
515 private_key: &[u8; 16],
516 ip: Option<&str>,
517 ) -> Result<GameProfile, ServerSessionServerError> {
518 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
519 }
520
521 #[must_use]
523 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
524 Connection::from(self)
525 }
526}
527
528impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
529 #[must_use]
532 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
533 Connection::from(self)
534 }
535}
536
537impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
538 #[must_use]
541 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
542 Connection::from(self)
543 }
544}
545
546impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
547 #[must_use]
549 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
550 Connection::from(self)
551 }
552}
553impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
554 #[must_use]
556 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
557 Connection::from(self)
558 }
559}
560
561impl<R1, W1> Connection<R1, W1>
564where
565 R1: ProtocolPacket + Debug,
566 W1: ProtocolPacket + Debug,
567{
568 #[must_use]
571 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
572 where
573 R2: ProtocolPacket + Debug,
574 W2: ProtocolPacket + Debug,
575 {
576 Connection {
577 reader: ReadConnection {
578 raw: connection.reader.raw,
579 _reading: PhantomData,
580 },
581 writer: WriteConnection {
582 raw: connection.writer.raw,
583 _writing: PhantomData,
584 },
585 }
586 }
587
588 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
590 let (read_stream, write_stream) = stream.into_split();
591
592 Connection {
593 reader: ReadConnection {
594 raw: RawReadConnection {
595 read_stream,
596 buffer: Cursor::new(Vec::new()),
597 compression_threshold: None,
598 dec_cipher: None,
599 },
600 _reading: PhantomData,
601 },
602 writer: WriteConnection {
603 raw: RawWriteConnection {
604 write_stream,
605 compression_threshold: None,
606 enc_cipher: None,
607 },
608 _writing: PhantomData,
609 },
610 }
611 }
612
613 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
615 self.reader
616 .raw
617 .read_stream
618 .reunite(self.writer.raw.write_stream)
619 }
620}