azalea_protocol/
read.rs

1//! Read packets from a stream.
2
3use std::{
4    backtrace::Backtrace,
5    env,
6    fmt::Debug,
7    io,
8    io::{Cursor, Read},
9    sync::LazyLock,
10};
11
12use azalea_buf::{AzaleaReadVar, BufReadError};
13use azalea_crypto::Aes128CfbDec;
14use flate2::read::ZlibDecoder;
15use futures::StreamExt;
16use futures_lite::future;
17use thiserror::Error;
18use tokio::io::AsyncRead;
19use tokio_util::{
20    bytes::Buf,
21    codec::{BytesCodec, FramedRead},
22};
23use tracing::trace;
24
25use crate::packets::ProtocolPacket;
26
27#[derive(Error, Debug)]
28pub enum ReadPacketError {
29    #[error("Error reading packet {packet_name} (id {packet_id}): {source}")]
30    Parse {
31        packet_id: u32,
32        packet_name: String,
33        backtrace: Box<Backtrace>,
34        source: BufReadError,
35    },
36    #[error("Unknown packet id {id} in state {state_name}")]
37    UnknownPacketId { state_name: String, id: u32 },
38    #[error("Couldn't read packet id")]
39    ReadPacketId { source: BufReadError },
40    #[error(transparent)]
41    Decompress {
42        #[from]
43        #[backtrace]
44        source: DecompressionError,
45    },
46    #[error(transparent)]
47    FrameSplitter {
48        #[from]
49        #[backtrace]
50        source: FrameSplitterError,
51    },
52    #[error("Leftover data after reading packet {packet_name}: {data:?}")]
53    LeftoverData { data: Vec<u8>, packet_name: String },
54    #[error(transparent)]
55    IoError {
56        #[from]
57        #[backtrace]
58        source: io::Error,
59    },
60    #[error("Connection closed")]
61    ConnectionClosed,
62}
63
64#[derive(Error, Debug)]
65pub enum FrameSplitterError {
66    #[error("Couldn't read VarInt length for packet. The previous packet may have been corrupted")]
67    LengthRead {
68        #[from]
69        source: BufReadError,
70    },
71    #[error("Io error")]
72    Io {
73        #[from]
74        #[backtrace]
75        source: io::Error,
76    },
77    #[error("Packet is longer than {max} bytes (is {size})")]
78    BadLength { max: usize, size: usize },
79    #[error("Connection reset by peer")]
80    ConnectionReset,
81    #[error("Connection closed")]
82    ConnectionClosed,
83}
84
85/// Read a length, then read that amount of bytes from the `Cursor<Vec<u8>>`. If
86/// there's not enough data, `None` is returned.
87fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Result<Box<[u8]>, FrameSplitterError> {
88    // copy the buffer first and read from the copy, then once we make sure
89    // the packet is all good we read it fully
90    let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
91    // Packet Length
92    let length = match u32::azalea_read_var(&mut buffer_copy) {
93        Ok(length) => length as usize,
94        Err(err) => match err {
95            BufReadError::Io { source } => return Err(FrameSplitterError::Io { source }),
96            _ => return Err(err.into()),
97        },
98    };
99
100    if length > buffer_copy.remaining() {
101        return Err(FrameSplitterError::BadLength {
102            max: buffer_copy.remaining(),
103            size: length,
104        });
105    }
106
107    // we read from the copy and we know it's legit, so we can take those bytes
108    // from the real buffer now
109
110    // the length of the varint that says the length of the whole packet
111    let varint_length = buffer.remaining() - buffer_copy.remaining();
112
113    buffer.advance(varint_length);
114    let data =
115        buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
116    buffer.advance(length);
117
118    if buffer.position() == buffer.get_ref().len() as u64 {
119        // reset the inner vec once we've reached the end of the buffer so we don't keep
120        // leaking memory
121        buffer.get_mut().clear();
122
123        // we just cap the capacity to 64KB instead of resetting it to save some
124        // allocations.
125        // and the reason we bother capping it at all is to avoid wasting memory if we
126        // get a big packet once and then never again.
127        buffer.get_mut().shrink_to(1024 * 64);
128
129        buffer.set_position(0);
130    }
131
132    Ok(data.into_boxed_slice())
133}
134
135fn frame_splitter(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Box<[u8]>>, FrameSplitterError> {
136    // https://tokio.rs/tokio/tutorial/framing
137    let read_frame = parse_frame(buffer);
138    match read_frame {
139        Ok(frame) => return Ok(Some(frame)),
140        Err(err) => match err {
141            FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
142                // we probably just haven't read enough yet
143            }
144            _ => return Err(err),
145        },
146    }
147
148    Ok(None)
149}
150
151pub fn deserialize_packet<P: ProtocolPacket + Debug>(
152    stream: &mut Cursor<&[u8]>,
153) -> Result<P, Box<ReadPacketError>> {
154    // Packet ID
155    let packet_id =
156        u32::azalea_read_var(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
157    P::read(packet_id, stream)
158}
159
160// this is always true in multiplayer, false in singleplayer
161static VALIDATE_DECOMPRESSED: bool = true;
162
163pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 8_388_608;
164
165#[derive(Error, Debug)]
166pub enum DecompressionError {
167    #[error("Couldn't read VarInt length for data")]
168    LengthReadError {
169        #[from]
170        source: BufReadError,
171    },
172    #[error("Io error")]
173    Io {
174        #[from]
175        #[backtrace]
176        source: io::Error,
177    },
178    #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
179    BelowCompressionThreshold { size: u32, threshold: u32 },
180    #[error(
181        "Badly compressed packet - size of {size} is larger than protocol maximum of {maximum}"
182    )]
183    AboveCompressionThreshold { size: u32, maximum: u32 },
184}
185
186/// Get the decompressed bytes from a packet.
187///
188/// The stream must have already been decrypted before passing it to this
189/// function.
190pub fn compression_decoder(
191    stream: &mut Cursor<&[u8]>,
192    compression_threshold: u32,
193) -> Result<Box<[u8]>, DecompressionError> {
194    // Data Length
195    let n = u32::azalea_read_var(stream)?;
196    if n == 0 {
197        // no data size, no compression
198        let buf = stream.get_ref()[stream.position() as usize..]
199            .to_vec()
200            .into_boxed_slice();
201        stream.set_position(stream.get_ref().len() as u64);
202        return Ok(buf);
203    }
204
205    if VALIDATE_DECOMPRESSED {
206        if n < compression_threshold {
207            return Err(DecompressionError::BelowCompressionThreshold {
208                size: n,
209                threshold: compression_threshold,
210            });
211        }
212        if n > MAXIMUM_UNCOMPRESSED_LENGTH {
213            return Err(DecompressionError::AboveCompressionThreshold {
214                size: n,
215                maximum: MAXIMUM_UNCOMPRESSED_LENGTH,
216            });
217        }
218    }
219
220    // VALIDATE_DECOMPRESSED should always be true, so the max they can make us
221    // allocate here is 2mb
222    let mut decoded_buf = Vec::with_capacity(n as usize);
223
224    let mut decoder = ZlibDecoder::new(stream);
225    decoder.read_to_end(&mut decoded_buf)?;
226
227    Ok(decoded_buf.into_boxed_slice())
228}
229
230/// Read a single packet from a stream.
231///
232/// The buffer is required because servers may send multiple packets in the
233/// same frame, so we need to store the packet data that's left to read.
234///
235/// The current protocol state must be passed as a generic.
236///
237/// For the non-waiting version, see [`try_read_packet`].
238pub async fn read_packet<P: ProtocolPacket + Debug, R>(
239    stream: &mut R,
240    buffer: &mut Cursor<Vec<u8>>,
241    compression_threshold: Option<u32>,
242    cipher: &mut Option<Aes128CfbDec>,
243) -> Result<P, Box<ReadPacketError>>
244where
245    R: AsyncRead + Unpin + Send + Sync,
246{
247    let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
248    let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
249    Ok(packet)
250}
251
252/// Try to read a single packet from a stream. Returns None if we haven't
253/// received a full packet yet.
254pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
255    stream: &mut R,
256    buffer: &mut Cursor<Vec<u8>>,
257    compression_threshold: Option<u32>,
258    cipher: &mut Option<Aes128CfbDec>,
259) -> Result<Option<P>, Box<ReadPacketError>>
260where
261    R: AsyncRead + Unpin + Send + Sync,
262{
263    let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)?
264    else {
265        return Ok(None);
266    };
267    let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
268    Ok(Some(packet))
269}
270
271pub async fn read_raw_packet<R>(
272    stream: &mut R,
273    buffer: &mut Cursor<Vec<u8>>,
274    compression_threshold: Option<u32>,
275    // this has to be a &mut Option<T> instead of an Option<&mut T> because
276    // otherwise the borrow checker complains about the cipher being moved
277    cipher: &mut Option<Aes128CfbDec>,
278) -> Result<Box<[u8]>, Box<ReadPacketError>>
279where
280    R: AsyncRead + Unpin + Send + Sync,
281{
282    loop {
283        if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
284            // we got a full packet!!
285            return Ok(buf);
286        };
287
288        let bytes = read_and_decrypt_frame(stream, cipher).await?;
289        buffer.get_mut().extend_from_slice(&bytes);
290    }
291}
292/// Read a packet from the stream, then if necessary decrypt it, decompress
293/// it, and split it.
294pub fn try_read_raw_packet<R>(
295    stream: &mut R,
296    buffer: &mut Cursor<Vec<u8>>,
297    compression_threshold: Option<u32>,
298    cipher: &mut Option<Aes128CfbDec>,
299) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
300where
301    R: AsyncRead + Unpin + Send + Sync,
302{
303    loop {
304        if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
305            // we got a full packet!!
306            return Ok(Some(buf));
307        };
308        let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
309            // no data received
310            return Ok(None);
311        };
312        // we got some data, so add it to the buffer and try again
313        buffer.get_mut().extend_from_slice(&bytes);
314    }
315}
316
317async fn read_and_decrypt_frame<R>(
318    stream: &mut R,
319    cipher: &mut Option<Aes128CfbDec>,
320) -> Result<Box<[u8]>, Box<ReadPacketError>>
321where
322    R: AsyncRead + Unpin + Send + Sync,
323{
324    let mut framed = FramedRead::new(stream, BytesCodec::new());
325
326    let Some(message) = framed.next().await else {
327        return Err(Box::new(ReadPacketError::ConnectionClosed));
328    };
329    let bytes = message.map_err(ReadPacketError::from)?;
330
331    let mut bytes = bytes.to_vec().into_boxed_slice();
332
333    // decrypt if necessary
334    if let Some(cipher) = cipher {
335        azalea_crypto::decrypt_packet(cipher, &mut bytes);
336    }
337
338    Ok(bytes)
339}
340fn try_read_and_decrypt_frame<R>(
341    stream: &mut R,
342    cipher: &mut Option<Aes128CfbDec>,
343) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
344where
345    R: AsyncRead + Unpin + Send + Sync,
346{
347    let mut framed = FramedRead::new(stream, BytesCodec::new());
348
349    let Some(message) = future::block_on(future::poll_once(framed.next())) else {
350        // nothing yet
351        return Ok(None);
352    };
353    let Some(message) = message else {
354        return Err(Box::new(ReadPacketError::ConnectionClosed));
355    };
356    let bytes = message.map_err(ReadPacketError::from)?;
357    let mut bytes = bytes.to_vec().into_boxed_slice();
358
359    // decrypt if necessary
360    if let Some(cipher) = cipher {
361        azalea_crypto::decrypt_packet(cipher, &mut bytes);
362    }
363
364    Ok(Some(bytes))
365}
366
367pub fn read_raw_packet_from_buffer<R>(
368    buffer: &mut Cursor<Vec<u8>>,
369    compression_threshold: Option<u32>,
370) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
371where
372    R: AsyncRead + Unpin + Send + Sync,
373{
374    let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
375        // no full packet yet :(
376        return Ok(None);
377    };
378
379    if let Some(compression_threshold) = compression_threshold {
380        buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)
381            .map_err(ReadPacketError::from)?;
382    }
383
384    if tracing::enabled!(tracing::Level::TRACE) {
385        static DO_NOT_CUT_OFF_PACKET_LOGS: LazyLock<bool> = LazyLock::new(|| {
386            env::var("AZALEA_DO_NOT_CUT_OFF_PACKET_LOGS")
387                .map(|s| s == "1" || s == "true")
388                .unwrap_or(false)
389        });
390
391        let buf_string: String = {
392            if !*DO_NOT_CUT_OFF_PACKET_LOGS && buf.len() > 500 {
393                let cut_off_buf = &buf[..500];
394                format!("{cut_off_buf:?}...")
395            } else {
396                format!("{buf:?}")
397            }
398        };
399        trace!("Reading packet with bytes: {buf_string}");
400    };
401
402    Ok(Some(buf))
403}