1use std::fmt::{self, Debug, Display};
4use std::io::{self, Cursor};
5use std::marker::PhantomData;
6use std::net::SocketAddr;
7
8use azalea_auth::game_profile::GameProfile;
9use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
10use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
11use thiserror::Error;
12use tokio::io::{AsyncWriteExt, BufStream};
13use tokio::net::TcpStream;
14use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
15use tracing::{error, info};
16use uuid::Uuid;
17
18use crate::packets::ProtocolPacket;
19use crate::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
20use crate::packets::game::{ClientboundGamePacket, ServerboundGamePacket};
21use crate::packets::handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket};
22use crate::packets::login::c_hello::ClientboundHello;
23use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
24use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
25use crate::read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet};
26use crate::write::{serialize_packet, write_raw_packet};
27
28pub struct RawReadConnection {
29 pub read_stream: OwnedReadHalf,
30 pub buffer: Cursor<Vec<u8>>,
31 pub compression_threshold: Option<u32>,
32 pub dec_cipher: Option<Aes128CfbDec>,
33}
34
35pub struct RawWriteConnection {
36 pub write_stream: OwnedWriteHalf,
37 pub compression_threshold: Option<u32>,
38 pub enc_cipher: Option<Aes128CfbEnc>,
39}
40
41pub struct ReadConnection<R: ProtocolPacket> {
43 pub raw: RawReadConnection,
44 _reading: PhantomData<R>,
45}
46
47pub struct WriteConnection<W: ProtocolPacket> {
49 pub raw: RawWriteConnection,
50 _writing: PhantomData<W>,
51}
52
53pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
132 pub reader: ReadConnection<R>,
133 pub writer: WriteConnection<W>,
134}
135
136impl RawReadConnection {
137 pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
138 read_raw_packet::<_>(
139 &mut self.read_stream,
140 &mut self.buffer,
141 self.compression_threshold,
142 &mut self.dec_cipher,
143 )
144 .await
145 }
146
147 pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
148 try_read_raw_packet::<_>(
149 &mut self.read_stream,
150 &mut self.buffer,
151 self.compression_threshold,
152 &mut self.dec_cipher,
153 )
154 }
155}
156
157impl RawWriteConnection {
158 pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
159 if let Err(e) = write_raw_packet(
160 packet,
161 &mut self.write_stream,
162 self.compression_threshold,
163 &mut self.enc_cipher,
164 )
165 .await
166 {
167 if e.kind() == io::ErrorKind::BrokenPipe {
169 info!("Broken pipe, shutting down connection.");
170 if let Err(e) = self.shutdown().await {
171 error!("Couldn't shut down: {}", e);
172 }
173 }
174 return Err(e);
175 }
176 Ok(())
177 }
178
179 pub async fn shutdown(&mut self) -> io::Result<()> {
181 self.write_stream.shutdown().await
182 }
183}
184
185impl<R> ReadConnection<R>
186where
187 R: ProtocolPacket + Debug,
188{
189 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
191 let raw_packet = self.raw.read().await?;
192 deserialize_packet(&mut Cursor::new(&raw_packet))
193 }
194
195 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
198 let Some(raw_packet) = self.raw.try_read()? else {
199 return Ok(None);
200 };
201 Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
202 }
203}
204impl<W> WriteConnection<W>
205where
206 W: ProtocolPacket + Debug,
207{
208 pub async fn write(&mut self, packet: W) -> io::Result<()> {
210 self.raw.write(&serialize_packet(&packet).unwrap()).await
211 }
212
213 pub async fn shutdown(&mut self) -> io::Result<()> {
215 self.raw.shutdown().await
216 }
217}
218
219impl<R, W> Connection<R, W>
220where
221 R: ProtocolPacket + Debug,
222 W: ProtocolPacket + Debug,
223{
224 pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
226 self.reader.read().await
227 }
228
229 pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232 self.reader.try_read()
233 }
234
235 pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
237 let packet = packet.into_variant();
238 self.writer.write(packet).await
239 }
240
241 #[must_use]
243 pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
244 (self.reader, self.writer)
245 }
246
247 #[must_use]
252 pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
253 (self.reader.raw, self.writer.raw)
254 }
255}
256
257#[derive(Error, Debug)]
258pub enum ConnectionError {
259 #[error("{0}")]
260 Io(#[from] io::Error),
261}
262
263use socks5_impl::protocol::UserKey;
264
265#[derive(Debug, Clone)]
267pub struct Proxy {
268 pub addr: SocketAddr,
269 pub auth: Option<UserKey>,
270}
271
272impl Proxy {
273 pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
274 Self { addr, auth }
275 }
276}
277impl Display for Proxy {
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 write!(f, "socks5://")?;
280 if let Some(auth) = &self.auth {
281 write!(f, "{auth}@")?;
282 }
283 write!(f, "{}", self.addr)
284 }
285}
286
287impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
288 pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
290 let stream = TcpStream::connect(address).await?;
291
292 stream.set_nodelay(true)?;
294
295 Self::new_from_stream(stream).await
296 }
297
298 pub async fn new_with_proxy(
301 address: &SocketAddr,
302 proxy: Proxy,
303 ) -> Result<Self, ConnectionError> {
304 let proxy_stream = TcpStream::connect(proxy.addr).await?;
305 let mut stream = BufStream::new(proxy_stream);
306
307 let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
308 .await
309 .map_err(io::Error::other)?;
310
311 Self::new_from_stream(stream.into_inner()).await
312 }
313
314 pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
317 let (read_stream, write_stream) = stream.into_split();
318
319 Ok(Connection {
320 reader: ReadConnection {
321 raw: RawReadConnection {
322 read_stream,
323 buffer: Cursor::new(Vec::new()),
324 compression_threshold: None,
325 dec_cipher: None,
326 },
327 _reading: PhantomData,
328 },
329 writer: WriteConnection {
330 raw: RawWriteConnection {
331 write_stream,
332 compression_threshold: None,
333 enc_cipher: None,
334 },
335 _writing: PhantomData,
336 },
337 })
338 }
339
340 #[must_use]
343 pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
344 Connection::from(self)
345 }
346
347 #[must_use]
350 pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
351 Connection::from(self)
352 }
353}
354
355impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
356 pub fn set_compression_threshold(&mut self, threshold: i32) {
361 if threshold >= 0 {
363 self.reader.raw.compression_threshold = Some(threshold as u32);
364 self.writer.raw.compression_threshold = Some(threshold as u32);
365 } else {
366 self.reader.raw.compression_threshold = None;
367 self.writer.raw.compression_threshold = None;
368 }
369 }
370
371 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
374 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
375 self.reader.raw.dec_cipher = Some(dec_cipher);
376 self.writer.raw.enc_cipher = Some(enc_cipher);
377 }
378
379 #[must_use]
382 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
383 Connection::from(self)
384 }
385
386 pub async fn authenticate(
438 &self,
439 access_token: &str,
440 uuid: &Uuid,
441 private_key: [u8; 16],
442 packet: &ClientboundHello,
443 ) -> Result<(), ClientSessionServerError> {
444 azalea_auth::sessionserver::join(
445 access_token,
446 &packet.public_key,
447 &private_key,
448 uuid,
449 &packet.server_id,
450 )
451 .await
452 }
453}
454
455impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
456 #[must_use]
459 pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
460 Connection::from(self)
461 }
462
463 #[must_use]
466 pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
467 Connection::from(self)
468 }
469}
470
471impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
472 pub fn set_compression_threshold(&mut self, threshold: i32) {
476 if threshold >= 0 {
478 self.reader.raw.compression_threshold = Some(threshold as u32);
479 self.writer.raw.compression_threshold = Some(threshold as u32);
480 } else {
481 self.reader.raw.compression_threshold = None;
482 self.writer.raw.compression_threshold = None;
483 }
484 }
485
486 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
489 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
490 self.reader.raw.dec_cipher = Some(dec_cipher);
491 self.writer.raw.enc_cipher = Some(enc_cipher);
492 }
493
494 #[must_use]
497 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
498 Connection::from(self)
499 }
500
501 pub async fn authenticate(
505 &self,
506 username: &str,
507 public_key: &[u8],
508 private_key: &[u8; 16],
509 ip: Option<&str>,
510 ) -> Result<GameProfile, ServerSessionServerError> {
511 azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
512 }
513
514 #[must_use]
516 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
517 Connection::from(self)
518 }
519}
520
521impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
522 #[must_use]
525 pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
526 Connection::from(self)
527 }
528}
529
530impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
531 #[must_use]
534 pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
535 Connection::from(self)
536 }
537}
538
539impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
540 #[must_use]
542 pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
543 Connection::from(self)
544 }
545}
546impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
547 #[must_use]
549 pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
550 Connection::from(self)
551 }
552}
553
554impl<R1, W1> Connection<R1, W1>
557where
558 R1: ProtocolPacket + Debug,
559 W1: ProtocolPacket + Debug,
560{
561 #[must_use]
564 pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
565 where
566 R2: ProtocolPacket + Debug,
567 W2: ProtocolPacket + Debug,
568 {
569 Connection {
570 reader: ReadConnection {
571 raw: connection.reader.raw,
572 _reading: PhantomData,
573 },
574 writer: WriteConnection {
575 raw: connection.writer.raw,
576 _writing: PhantomData,
577 },
578 }
579 }
580
581 pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
583 let (read_stream, write_stream) = stream.into_split();
584
585 Connection {
586 reader: ReadConnection {
587 raw: RawReadConnection {
588 read_stream,
589 buffer: Cursor::new(Vec::new()),
590 compression_threshold: None,
591 dec_cipher: None,
592 },
593 _reading: PhantomData,
594 },
595 writer: WriteConnection {
596 raw: RawWriteConnection {
597 write_stream,
598 compression_threshold: None,
599 enc_cipher: None,
600 },
601 _writing: PhantomData,
602 },
603 }
604 }
605
606 pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
608 self.reader
609 .raw
610 .read_stream
611 .reunite(self.writer.raw.write_stream)
612 }
613}