1use std::backtrace::Backtrace;
4use std::env;
5use std::sync::LazyLock;
6use std::{
7 fmt::Debug,
8 io::{Cursor, Read},
9};
10
11use azalea_buf::AzaleaReadVar;
12use azalea_buf::BufReadError;
13use azalea_crypto::Aes128CfbDec;
14use flate2::read::ZlibDecoder;
15use futures::StreamExt;
16use futures_lite::future;
17use thiserror::Error;
18use tokio::io::AsyncRead;
19use tokio_util::bytes::Buf;
20use tokio_util::codec::{BytesCodec, FramedRead};
21use tracing::trace;
22
23use crate::packets::ProtocolPacket;
24
25#[derive(Error, Debug)]
26pub enum ReadPacketError {
27 #[error("Error reading packet {packet_name} (id {packet_id}): {source}")]
28 Parse {
29 packet_id: u32,
30 packet_name: String,
31 backtrace: Box<Backtrace>,
32 source: BufReadError,
33 },
34 #[error("Unknown packet id {id} in state {state_name}")]
35 UnknownPacketId { state_name: String, id: u32 },
36 #[error("Couldn't read packet id")]
37 ReadPacketId { source: BufReadError },
38 #[error(transparent)]
39 Decompress {
40 #[from]
41 #[backtrace]
42 source: DecompressionError,
43 },
44 #[error(transparent)]
45 FrameSplitter {
46 #[from]
47 #[backtrace]
48 source: FrameSplitterError,
49 },
50 #[error("Leftover data after reading packet {packet_name}: {data:?}")]
51 LeftoverData { data: Vec<u8>, packet_name: String },
52 #[error(transparent)]
53 IoError {
54 #[from]
55 #[backtrace]
56 source: std::io::Error,
57 },
58 #[error("Connection closed")]
59 ConnectionClosed,
60}
61
62#[derive(Error, Debug)]
63pub enum FrameSplitterError {
64 #[error("Couldn't read VarInt length for packet. The previous packet may have been corrupted")]
65 LengthRead {
66 #[from]
67 source: BufReadError,
68 },
69 #[error("Io error")]
70 Io {
71 #[from]
72 #[backtrace]
73 source: std::io::Error,
74 },
75 #[error("Packet is longer than {max} bytes (is {size})")]
76 BadLength { max: usize, size: usize },
77 #[error("Connection reset by peer")]
78 ConnectionReset,
79 #[error("Connection closed")]
80 ConnectionClosed,
81}
82
83fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Result<Box<[u8]>, FrameSplitterError> {
86 let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
89 let length = match u32::azalea_read_var(&mut buffer_copy) {
91 Ok(length) => length as usize,
92 Err(err) => match err {
93 BufReadError::Io { source } => return Err(FrameSplitterError::Io { source }),
94 _ => return Err(err.into()),
95 },
96 };
97
98 if length > buffer_copy.remaining() {
99 return Err(FrameSplitterError::BadLength {
100 max: buffer_copy.remaining(),
101 size: length,
102 });
103 }
104
105 let varint_length = buffer.remaining() - buffer_copy.remaining();
110
111 buffer.advance(varint_length);
112 let data =
113 buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
114 buffer.advance(length);
115
116 if buffer.position() == buffer.get_ref().len() as u64 {
117 buffer.get_mut().clear();
120
121 buffer.get_mut().shrink_to(1024 * 64);
126
127 buffer.set_position(0);
128 }
129
130 Ok(data.into_boxed_slice())
131}
132
133fn frame_splitter(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Box<[u8]>>, FrameSplitterError> {
134 let read_frame = parse_frame(buffer);
136 match read_frame {
137 Ok(frame) => return Ok(Some(frame)),
138 Err(err) => match err {
139 FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
140 }
142 _ => return Err(err),
143 },
144 }
145
146 Ok(None)
147}
148
149pub fn deserialize_packet<P: ProtocolPacket + Debug>(
150 stream: &mut Cursor<&[u8]>,
151) -> Result<P, Box<ReadPacketError>> {
152 let packet_id =
154 u32::azalea_read_var(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
155 P::read(packet_id, stream)
156}
157
158static VALIDATE_DECOMPRESSED: bool = true;
160
161pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 2_097_152;
162
163#[derive(Error, Debug)]
164pub enum DecompressionError {
165 #[error("Couldn't read VarInt length for data")]
166 LengthReadError {
167 #[from]
168 source: BufReadError,
169 },
170 #[error("Io error")]
171 Io {
172 #[from]
173 #[backtrace]
174 source: std::io::Error,
175 },
176 #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
177 BelowCompressionThreshold { size: u32, threshold: u32 },
178 #[error(
179 "Badly compressed packet - size of {size} is larger than protocol maximum of {maximum}"
180 )]
181 AboveCompressionThreshold { size: u32, maximum: u32 },
182}
183
184pub fn compression_decoder(
187 stream: &mut Cursor<&[u8]>,
188 compression_threshold: u32,
189) -> Result<Box<[u8]>, DecompressionError> {
190 let n = u32::azalea_read_var(stream)?;
192 if n == 0 {
193 let buf = stream.get_ref()[stream.position() as usize..]
195 .to_vec()
196 .into_boxed_slice();
197 stream.set_position(stream.get_ref().len() as u64);
198 return Ok(buf);
199 }
200
201 if VALIDATE_DECOMPRESSED {
202 if n < compression_threshold {
203 return Err(DecompressionError::BelowCompressionThreshold {
204 size: n,
205 threshold: compression_threshold,
206 });
207 }
208 if n > MAXIMUM_UNCOMPRESSED_LENGTH {
209 return Err(DecompressionError::AboveCompressionThreshold {
210 size: n,
211 maximum: MAXIMUM_UNCOMPRESSED_LENGTH,
212 });
213 }
214 }
215
216 let mut decoded_buf = Vec::with_capacity(n as usize);
219
220 let mut decoder = ZlibDecoder::new(stream);
221 decoder.read_to_end(&mut decoded_buf)?;
222
223 Ok(decoded_buf.into_boxed_slice())
224}
225
226pub async fn read_packet<P: ProtocolPacket + Debug, R>(
235 stream: &mut R,
236 buffer: &mut Cursor<Vec<u8>>,
237 compression_threshold: Option<u32>,
238 cipher: &mut Option<Aes128CfbDec>,
239) -> Result<P, Box<ReadPacketError>>
240where
241 R: AsyncRead + Unpin + Send + Sync,
242{
243 let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
244 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
245 Ok(packet)
246}
247
248pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
251 stream: &mut R,
252 buffer: &mut Cursor<Vec<u8>>,
253 compression_threshold: Option<u32>,
254 cipher: &mut Option<Aes128CfbDec>,
255) -> Result<Option<P>, Box<ReadPacketError>>
256where
257 R: AsyncRead + Unpin + Send + Sync,
258{
259 let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)?
260 else {
261 return Ok(None);
262 };
263 let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
264 Ok(Some(packet))
265}
266
267pub async fn read_raw_packet<R>(
268 stream: &mut R,
269 buffer: &mut Cursor<Vec<u8>>,
270 compression_threshold: Option<u32>,
271 cipher: &mut Option<Aes128CfbDec>,
274) -> Result<Box<[u8]>, Box<ReadPacketError>>
275where
276 R: AsyncRead + Unpin + Send + Sync,
277{
278 loop {
279 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
280 return Ok(buf);
282 };
283
284 let bytes = read_and_decrypt_frame(stream, cipher).await?;
285 buffer.get_mut().extend_from_slice(&bytes);
286 }
287}
288pub fn try_read_raw_packet<R>(
289 stream: &mut R,
290 buffer: &mut Cursor<Vec<u8>>,
291 compression_threshold: Option<u32>,
292 cipher: &mut Option<Aes128CfbDec>,
293) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
294where
295 R: AsyncRead + Unpin + Send + Sync,
296{
297 loop {
298 if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
299 return Ok(Some(buf));
301 };
302 let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
303 return Ok(None);
305 };
306 buffer.get_mut().extend_from_slice(&bytes);
308 }
309}
310
311async fn read_and_decrypt_frame<R>(
312 stream: &mut R,
313 cipher: &mut Option<Aes128CfbDec>,
314) -> Result<Box<[u8]>, Box<ReadPacketError>>
315where
316 R: AsyncRead + Unpin + Send + Sync,
317{
318 let mut framed = FramedRead::new(stream, BytesCodec::new());
319
320 let Some(message) = framed.next().await else {
321 return Err(Box::new(ReadPacketError::ConnectionClosed));
322 };
323 let bytes = message.map_err(ReadPacketError::from)?;
324
325 let mut bytes = bytes.to_vec().into_boxed_slice();
326
327 if let Some(cipher) = cipher {
329 azalea_crypto::decrypt_packet(cipher, &mut bytes);
330 }
331
332 Ok(bytes)
333}
334fn try_read_and_decrypt_frame<R>(
335 stream: &mut R,
336 cipher: &mut Option<Aes128CfbDec>,
337) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
338where
339 R: AsyncRead + Unpin + Send + Sync,
340{
341 let mut framed = FramedRead::new(stream, BytesCodec::new());
342
343 let Some(message) = future::block_on(future::poll_once(framed.next())) else {
344 return Ok(None);
346 };
347 let Some(message) = message else {
348 return Err(Box::new(ReadPacketError::ConnectionClosed));
349 };
350 let bytes = message.map_err(ReadPacketError::from)?;
351 let mut bytes = bytes.to_vec().into_boxed_slice();
352
353 if let Some(cipher) = cipher {
355 azalea_crypto::decrypt_packet(cipher, &mut bytes);
356 }
357
358 Ok(Some(bytes))
359}
360
361pub fn read_raw_packet_from_buffer<R>(
362 buffer: &mut Cursor<Vec<u8>>,
363 compression_threshold: Option<u32>,
364) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
365where
366 R: AsyncRead + Unpin + Send + Sync,
367{
368 let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
369 return Ok(None);
371 };
372
373 if let Some(compression_threshold) = compression_threshold {
374 buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)
375 .map_err(ReadPacketError::from)?;
376 }
377
378 if tracing::enabled!(tracing::Level::TRACE) {
379 static DO_NOT_CUT_OFF_PACKET_LOGS: LazyLock<bool> = LazyLock::new(|| {
380 env::var("AZALEA_DO_NOT_CUT_OFF_PACKET_LOGS")
381 .map(|s| s == "1" || s == "true")
382 .unwrap_or(false)
383 });
384
385 let buf_string: String = {
386 if !*DO_NOT_CUT_OFF_PACKET_LOGS && buf.len() > 500 {
387 let cut_off_buf = &buf[..500];
388 format!("{cut_off_buf:?}...")
389 } else {
390 format!("{buf:?}")
391 }
392 };
393 trace!("Reading packet with bytes: {buf_string}");
394 };
395
396 Ok(Some(buf))
397}