1use std::{
4 backtrace::Backtrace,
5 env,
6 fmt::Debug,
7 io,
8 io::{Cursor, Read},
9 sync::LazyLock,
10};
11
12use azalea_buf::{AzaleaReadVar, BufReadError};
13use azalea_crypto::Aes128CfbDec;
14use flate2::read::ZlibDecoder;
15use futures::StreamExt;
16use futures_lite::future;
17use thiserror::Error;
18use tokio::io::AsyncRead;
19use tokio_util::{
20 bytes::Buf,
21 codec::{BytesCodec, FramedRead},
22};
23use tracing::trace;
24
25use crate::packets::ProtocolPacket;
26
27#[derive(Error, Debug)]
28pub enum ReadPacketError {
29 #[error("Error reading packet {packet_name} (id {packet_id}): {source}")]
30 Parse {
31 packet_id: u32,
32 packet_name: String,
33 backtrace: Box<Backtrace>,
34 source: BufReadError,
35 },
36 #[error("Unknown packet id {id} in state {state_name}")]
37 UnknownPacketId { state_name: String, id: u32 },
38 #[error("Couldn't read packet id")]
39 ReadPacketId { source: BufReadError },
40 #[error(transparent)]
41 Decompress {
42 #[from]
43 #[backtrace]
44 source: DecompressionError,
45 },
46 #[error(transparent)]
47 FrameSplitter {
48 #[from]
49 #[backtrace]
50 source: FrameSplitterError,
51 },
52 #[error("Leftover data after reading packet {packet_name}: {data:?}")]
53 LeftoverData { data: Vec<u8>, packet_name: String },
54 #[error(transparent)]
55 IoError {
56 #[from]
57 #[backtrace]
58 source: io::Error,
59 },
60 #[error("Connection closed")]
61 ConnectionClosed,
62}
63
64#[derive(Error, Debug)]
65pub enum FrameSplitterError {
66 #[error("Couldn't read VarInt length for packet. The previous packet may have been corrupted")]
67 LengthRead {
68 #[from]
69 source: BufReadError,
70 },
71 #[error("Io error")]
72 Io {
73 #[from]
74 #[backtrace]
75 source: io::Error,
76 },
77 #[error("Packet is longer than {max} bytes (is {size})")]
78 BadLength { max: usize, size: usize },
79 #[error("Connection reset by peer")]
80 ConnectionReset,
81 #[error("Connection closed")]
82 ConnectionClosed,
83}
84
85fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Result<Box<[u8]>, FrameSplitterError> {
88 let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
91 let length = match u32::azalea_read_var(&mut buffer_copy) {
93 Ok(length) => length as usize,
94 Err(err) => match err {
95 BufReadError::Io { source } => return Err(FrameSplitterError::Io { source }),
96 _ => return Err(err.into()),
97 },
98 };
99
100 if length > buffer_copy.remaining() {
101 return Err(FrameSplitterError::BadLength {
102 max: buffer_copy.remaining(),
103 size: length,
104 });
105 }
106
107 let varint_length = buffer.remaining() - buffer_copy.remaining();
112
113 buffer.advance(varint_length);
114 let data =
115 buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
116 buffer.advance(length);
117
118 if buffer.position() == buffer.get_ref().len() as u64 {
119 buffer.get_mut().clear();
122
123 buffer.get_mut().shrink_to(1024 * 64);
128
129 buffer.set_position(0);
130 }
131
132 Ok(data.into_boxed_slice())
133}
134
135fn frame_splitter(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Box<[u8]>>, FrameSplitterError> {
136 let read_frame = parse_frame(buffer);
138 match read_frame {
139 Ok(frame) => return Ok(Some(frame)),
140 Err(err) => match err {
141 FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
142 }
144 _ => return Err(err),
145 },
146 }
147
148 Ok(None)
149}
150
151pub fn deserialize_packet<P: ProtocolPacket + Debug>(
152 stream: &mut Cursor<&[u8]>,
153) -> Result<P, Box<ReadPacketError>> {
154 let packet_id =
156 u32::azalea_read_var(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
157 P::read(packet_id, stream)
158}
159
160static VALIDATE_DECOMPRESSED: bool = true;
162
163pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 8_388_608;
164
165#[derive(Error, Debug)]
166pub enum DecompressionError {
167 #[error("Couldn't read VarInt length for data")]
168 LengthReadError {
169 #[from]
170 source: BufReadError,
171 },
172 #[error("Io error")]
173 Io {
174 #[from]
175 #[backtrace]
176 source: io::Error,
177 },
178 #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
179 BelowCompressionThreshold { size: u32, threshold: u32 },
180 #[error(
181 "Badly compressed packet - size of {size} is larger than protocol maximum of {maximum}"
182 )]
183 AboveCompressionThreshold { size: u32, maximum: u32 },
184}
185
186pub fn compression_decoder(
189 stream: &mut Cursor<&[u8]>,
190 compression_threshold: u32,
191) -> Result<Box<[u8]>, DecompressionError> {
192 let n = u32::azalea_read_var(stream)?;
194 if n == 0 {
195 let buf = stream.get_ref()[stream.position() as usize..]
197 .to_vec()
198 .into_boxed_slice();
199 stream.set_position(stream.get_ref().len() as u64);
200 return Ok(buf);
201 }
202
203 if VALIDATE_DECOMPRESSED {
204 if n < compression_threshold {
205 return Err(DecompressionError::BelowCompressionThreshold {
206 size: n,
207 threshold: compression_threshold,
208 });
209 }
210 if n > MAXIMUM_UNCOMPRESSED_LENGTH {
211 return Err(DecompressionError::AboveCompressionThreshold {
212 size: n,
213 maximum: MAXIMUM_UNCOMPRESSED_LENGTH,
214 });
215 }
216 }
217
218 let mut decoded_buf = Vec::with_capacity(n as usize);
221
222 let mut decoder = ZlibDecoder::new(stream);
223 decoder.read_to_end(&mut decoded_buf)?;
224
225 Ok(decoded_buf.into_boxed_slice())
226}
227
228pub async fn read_packet<P: ProtocolPacket + Debug, R>(
237 stream: &mut R,
238 buffer: &mut Cursor<Vec<u8>>,
239 compression_threshold: Option<u32>,
240 cipher: &mut Option<Aes128CfbDec>,
241) -> Result<P, Box<ReadPacketError>>
242where
243 R: AsyncRead + Unpin + Send + Sync,
244{
245 let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
246 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
247 Ok(packet)
248}
249
250pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
253 stream: &mut R,
254 buffer: &mut Cursor<Vec<u8>>,
255 compression_threshold: Option<u32>,
256 cipher: &mut Option<Aes128CfbDec>,
257) -> Result<Option<P>, Box<ReadPacketError>>
258where
259 R: AsyncRead + Unpin + Send + Sync,
260{
261 let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)?
262 else {
263 return Ok(None);
264 };
265 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
266 Ok(Some(packet))
267}
268
269pub async fn read_raw_packet<R>(
270 stream: &mut R,
271 buffer: &mut Cursor<Vec<u8>>,
272 compression_threshold: Option<u32>,
273 cipher: &mut Option<Aes128CfbDec>,
276) -> Result<Box<[u8]>, Box<ReadPacketError>>
277where
278 R: AsyncRead + Unpin + Send + Sync,
279{
280 loop {
281 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
282 return Ok(buf);
284 };
285
286 let bytes = read_and_decrypt_frame(stream, cipher).await?;
287 buffer.get_mut().extend_from_slice(&bytes);
288 }
289}
290pub fn try_read_raw_packet<R>(
293 stream: &mut R,
294 buffer: &mut Cursor<Vec<u8>>,
295 compression_threshold: Option<u32>,
296 cipher: &mut Option<Aes128CfbDec>,
297) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
298where
299 R: AsyncRead + Unpin + Send + Sync,
300{
301 loop {
302 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
303 return Ok(Some(buf));
305 };
306 let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
307 return Ok(None);
309 };
310 buffer.get_mut().extend_from_slice(&bytes);
312 }
313}
314
315async fn read_and_decrypt_frame<R>(
316 stream: &mut R,
317 cipher: &mut Option<Aes128CfbDec>,
318) -> Result<Box<[u8]>, Box<ReadPacketError>>
319where
320 R: AsyncRead + Unpin + Send + Sync,
321{
322 let mut framed = FramedRead::new(stream, BytesCodec::new());
323
324 let Some(message) = framed.next().await else {
325 return Err(Box::new(ReadPacketError::ConnectionClosed));
326 };
327 let bytes = message.map_err(ReadPacketError::from)?;
328
329 let mut bytes = bytes.to_vec().into_boxed_slice();
330
331 if let Some(cipher) = cipher {
333 azalea_crypto::decrypt_packet(cipher, &mut bytes);
334 }
335
336 Ok(bytes)
337}
338fn try_read_and_decrypt_frame<R>(
339 stream: &mut R,
340 cipher: &mut Option<Aes128CfbDec>,
341) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
342where
343 R: AsyncRead + Unpin + Send + Sync,
344{
345 let mut framed = FramedRead::new(stream, BytesCodec::new());
346
347 let Some(message) = future::block_on(future::poll_once(framed.next())) else {
348 return Ok(None);
350 };
351 let Some(message) = message else {
352 return Err(Box::new(ReadPacketError::ConnectionClosed));
353 };
354 let bytes = message.map_err(ReadPacketError::from)?;
355 let mut bytes = bytes.to_vec().into_boxed_slice();
356
357 if let Some(cipher) = cipher {
359 azalea_crypto::decrypt_packet(cipher, &mut bytes);
360 }
361
362 Ok(Some(bytes))
363}
364
365pub fn read_raw_packet_from_buffer<R>(
366 buffer: &mut Cursor<Vec<u8>>,
367 compression_threshold: Option<u32>,
368) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
369where
370 R: AsyncRead + Unpin + Send + Sync,
371{
372 let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
373 return Ok(None);
375 };
376
377 if let Some(compression_threshold) = compression_threshold {
378 buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)
379 .map_err(ReadPacketError::from)?;
380 }
381
382 if tracing::enabled!(tracing::Level::TRACE) {
383 static DO_NOT_CUT_OFF_PACKET_LOGS: LazyLock<bool> = LazyLock::new(|| {
384 env::var("AZALEA_DO_NOT_CUT_OFF_PACKET_LOGS")
385 .map(|s| s == "1" || s == "true")
386 .unwrap_or(false)
387 });
388
389 let buf_string: String = {
390 if !*DO_NOT_CUT_OFF_PACKET_LOGS && buf.len() > 500 {
391 let cut_off_buf = &buf[..500];
392 format!("{cut_off_buf:?}...")
393 } else {
394 format!("{buf:?}")
395 }
396 };
397 trace!("Reading packet with bytes: {buf_string}");
398 };
399
400 Ok(Some(buf))
401}