1use std::{
4 backtrace::Backtrace,
5 env,
6 fmt::Debug,
7 io,
8 io::{Cursor, Read},
9 sync::LazyLock,
10};
11
12use azalea_buf::{AzaleaReadVar, BufReadError};
13use azalea_crypto::Aes128CfbDec;
14use flate2::read::ZlibDecoder;
15use futures::StreamExt;
16use futures_lite::future;
17use thiserror::Error;
18use tokio::io::AsyncRead;
19use tokio_util::{
20 bytes::Buf,
21 codec::{BytesCodec, FramedRead},
22};
23use tracing::trace;
24
25use crate::packets::ProtocolPacket;
26
27#[derive(Error, Debug)]
28pub enum ReadPacketError {
29 #[error("Error reading packet {packet_name} (id {packet_id}): {source}")]
30 Parse {
31 packet_id: u32,
32 packet_name: String,
33 backtrace: Box<Backtrace>,
34 source: BufReadError,
35 },
36 #[error("Unknown packet id {id} in state {state_name}")]
37 UnknownPacketId { state_name: String, id: u32 },
38 #[error("Couldn't read packet id")]
39 ReadPacketId { source: BufReadError },
40 #[error(transparent)]
41 Decompress {
42 #[from]
43 #[backtrace]
44 source: DecompressionError,
45 },
46 #[error(transparent)]
47 FrameSplitter {
48 #[from]
49 #[backtrace]
50 source: FrameSplitterError,
51 },
52 #[error("Leftover data after reading packet {packet_name}: {data:?}")]
53 LeftoverData { data: Vec<u8>, packet_name: String },
54 #[error(transparent)]
55 IoError {
56 #[from]
57 #[backtrace]
58 source: io::Error,
59 },
60 #[error("Connection closed")]
61 ConnectionClosed,
62}
63
64#[derive(Error, Debug)]
65pub enum FrameSplitterError {
66 #[error("Couldn't read VarInt length for packet. The previous packet may have been corrupted")]
67 LengthRead {
68 #[from]
69 source: BufReadError,
70 },
71 #[error("Io error")]
72 Io {
73 #[from]
74 #[backtrace]
75 source: io::Error,
76 },
77 #[error("Packet is longer than {max} bytes (is {size})")]
78 BadLength { max: usize, size: usize },
79 #[error("Connection reset by peer")]
80 ConnectionReset,
81 #[error("Connection closed")]
82 ConnectionClosed,
83}
84
85fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Result<Box<[u8]>, FrameSplitterError> {
88 let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
91 let length = match u32::azalea_read_var(&mut buffer_copy) {
93 Ok(length) => length as usize,
94 Err(err) => match err {
95 BufReadError::Io { source } => return Err(FrameSplitterError::Io { source }),
96 _ => return Err(err.into()),
97 },
98 };
99
100 if length > buffer_copy.remaining() {
101 return Err(FrameSplitterError::BadLength {
102 max: buffer_copy.remaining(),
103 size: length,
104 });
105 }
106
107 let varint_length = buffer.remaining() - buffer_copy.remaining();
112
113 buffer.advance(varint_length);
114 let data =
115 buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
116 buffer.advance(length);
117
118 if buffer.position() == buffer.get_ref().len() as u64 {
119 buffer.get_mut().clear();
122
123 buffer.get_mut().shrink_to(1024 * 64);
128
129 buffer.set_position(0);
130 }
131
132 Ok(data.into_boxed_slice())
133}
134
135fn frame_splitter(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Box<[u8]>>, FrameSplitterError> {
136 let read_frame = parse_frame(buffer);
138 match read_frame {
139 Ok(frame) => return Ok(Some(frame)),
140 Err(err) => match err {
141 FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
142 }
144 _ => return Err(err),
145 },
146 }
147
148 Ok(None)
149}
150
151pub fn deserialize_packet<P: ProtocolPacket + Debug>(
152 stream: &mut Cursor<&[u8]>,
153) -> Result<P, Box<ReadPacketError>> {
154 let packet_id =
156 u32::azalea_read_var(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
157 P::read(packet_id, stream)
158}
159
160static VALIDATE_DECOMPRESSED: bool = true;
162
163pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 8_388_608;
164
165#[derive(Error, Debug)]
166pub enum DecompressionError {
167 #[error("Couldn't read VarInt length for data")]
168 LengthReadError {
169 #[from]
170 source: BufReadError,
171 },
172 #[error("Io error")]
173 Io {
174 #[from]
175 #[backtrace]
176 source: io::Error,
177 },
178 #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
179 BelowCompressionThreshold { size: u32, threshold: u32 },
180 #[error(
181 "Badly compressed packet - size of {size} is larger than protocol maximum of {maximum}"
182 )]
183 AboveCompressionThreshold { size: u32, maximum: u32 },
184}
185
186pub fn compression_decoder(
191 stream: &mut Cursor<&[u8]>,
192 compression_threshold: u32,
193) -> Result<Box<[u8]>, DecompressionError> {
194 let n = u32::azalea_read_var(stream)?;
196 if n == 0 {
197 let buf = stream.get_ref()[stream.position() as usize..]
199 .to_vec()
200 .into_boxed_slice();
201 stream.set_position(stream.get_ref().len() as u64);
202 return Ok(buf);
203 }
204
205 if VALIDATE_DECOMPRESSED {
206 if n < compression_threshold {
207 return Err(DecompressionError::BelowCompressionThreshold {
208 size: n,
209 threshold: compression_threshold,
210 });
211 }
212 if n > MAXIMUM_UNCOMPRESSED_LENGTH {
213 return Err(DecompressionError::AboveCompressionThreshold {
214 size: n,
215 maximum: MAXIMUM_UNCOMPRESSED_LENGTH,
216 });
217 }
218 }
219
220 let mut decoded_buf = Vec::with_capacity(n as usize);
223
224 let mut decoder = ZlibDecoder::new(stream);
225 decoder.read_to_end(&mut decoded_buf)?;
226
227 Ok(decoded_buf.into_boxed_slice())
228}
229
230pub async fn read_packet<P: ProtocolPacket + Debug, R>(
239 stream: &mut R,
240 buffer: &mut Cursor<Vec<u8>>,
241 compression_threshold: Option<u32>,
242 cipher: &mut Option<Aes128CfbDec>,
243) -> Result<P, Box<ReadPacketError>>
244where
245 R: AsyncRead + Unpin + Send + Sync,
246{
247 let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
248 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
249 Ok(packet)
250}
251
252pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
255 stream: &mut R,
256 buffer: &mut Cursor<Vec<u8>>,
257 compression_threshold: Option<u32>,
258 cipher: &mut Option<Aes128CfbDec>,
259) -> Result<Option<P>, Box<ReadPacketError>>
260where
261 R: AsyncRead + Unpin + Send + Sync,
262{
263 let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)?
264 else {
265 return Ok(None);
266 };
267 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
268 Ok(Some(packet))
269}
270
271pub async fn read_raw_packet<R>(
272 stream: &mut R,
273 buffer: &mut Cursor<Vec<u8>>,
274 compression_threshold: Option<u32>,
275 cipher: &mut Option<Aes128CfbDec>,
278) -> Result<Box<[u8]>, Box<ReadPacketError>>
279where
280 R: AsyncRead + Unpin + Send + Sync,
281{
282 loop {
283 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
284 return Ok(buf);
286 };
287
288 let bytes = read_and_decrypt_frame(stream, cipher).await?;
289 buffer.get_mut().extend_from_slice(&bytes);
290 }
291}
292pub fn try_read_raw_packet<R>(
295 stream: &mut R,
296 buffer: &mut Cursor<Vec<u8>>,
297 compression_threshold: Option<u32>,
298 cipher: &mut Option<Aes128CfbDec>,
299) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
300where
301 R: AsyncRead + Unpin + Send + Sync,
302{
303 loop {
304 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
305 return Ok(Some(buf));
307 };
308 let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
309 return Ok(None);
311 };
312 buffer.get_mut().extend_from_slice(&bytes);
314 }
315}
316
317async fn read_and_decrypt_frame<R>(
318 stream: &mut R,
319 cipher: &mut Option<Aes128CfbDec>,
320) -> Result<Box<[u8]>, Box<ReadPacketError>>
321where
322 R: AsyncRead + Unpin + Send + Sync,
323{
324 let mut framed = FramedRead::new(stream, BytesCodec::new());
325
326 let Some(message) = framed.next().await else {
327 return Err(Box::new(ReadPacketError::ConnectionClosed));
328 };
329 let bytes = message.map_err(ReadPacketError::from)?;
330
331 let mut bytes = bytes.to_vec().into_boxed_slice();
332
333 if let Some(cipher) = cipher {
335 azalea_crypto::decrypt_packet(cipher, &mut bytes);
336 }
337
338 Ok(bytes)
339}
340fn try_read_and_decrypt_frame<R>(
341 stream: &mut R,
342 cipher: &mut Option<Aes128CfbDec>,
343) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
344where
345 R: AsyncRead + Unpin + Send + Sync,
346{
347 let mut framed = FramedRead::new(stream, BytesCodec::new());
348
349 let Some(message) = future::block_on(future::poll_once(framed.next())) else {
350 return Ok(None);
352 };
353 let Some(message) = message else {
354 return Err(Box::new(ReadPacketError::ConnectionClosed));
355 };
356 let bytes = message.map_err(ReadPacketError::from)?;
357 let mut bytes = bytes.to_vec().into_boxed_slice();
358
359 if let Some(cipher) = cipher {
361 azalea_crypto::decrypt_packet(cipher, &mut bytes);
362 }
363
364 Ok(Some(bytes))
365}
366
367pub fn read_raw_packet_from_buffer<R>(
368 buffer: &mut Cursor<Vec<u8>>,
369 compression_threshold: Option<u32>,
370) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
371where
372 R: AsyncRead + Unpin + Send + Sync,
373{
374 let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
375 return Ok(None);
377 };
378
379 if let Some(compression_threshold) = compression_threshold {
380 buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)
381 .map_err(ReadPacketError::from)?;
382 }
383
384 if tracing::enabled!(tracing::Level::TRACE) {
385 static DO_NOT_CUT_OFF_PACKET_LOGS: LazyLock<bool> = LazyLock::new(|| {
386 env::var("AZALEA_DO_NOT_CUT_OFF_PACKET_LOGS")
387 .map(|s| s == "1" || s == "true")
388 .unwrap_or(false)
389 });
390
391 let buf_string: String = {
392 if !*DO_NOT_CUT_OFF_PACKET_LOGS && buf.len() > 500 {
393 let cut_off_buf = &buf[..500];
394 format!("{cut_off_buf:?}...")
395 } else {
396 format!("{buf:?}")
397 }
398 };
399 trace!("Reading packet with bytes: {buf_string}");
400 };
401
402 Ok(Some(buf))
403}