azalea_protocol/
read.rs

1//! Read packets from a stream.
2
3use std::backtrace::Backtrace;
4use std::env;
5use std::sync::LazyLock;
6use std::{
7    fmt::Debug,
8    io::{Cursor, Read},
9};
10
11use azalea_buf::AzaleaReadVar;
12use azalea_buf::BufReadError;
13use azalea_crypto::Aes128CfbDec;
14use flate2::read::ZlibDecoder;
15use futures::StreamExt;
16use futures_lite::future;
17use thiserror::Error;
18use tokio::io::AsyncRead;
19use tokio_util::bytes::Buf;
20use tokio_util::codec::{BytesCodec, FramedRead};
21use tracing::trace;
22
23use crate::packets::ProtocolPacket;
24
25#[derive(Error, Debug)]
26pub enum ReadPacketError {
27    #[error("Error reading packet {packet_name} (id {packet_id}): {source}")]
28    Parse {
29        packet_id: u32,
30        packet_name: String,
31        backtrace: Box<Backtrace>,
32        source: BufReadError,
33    },
34    #[error("Unknown packet id {id} in state {state_name}")]
35    UnknownPacketId { state_name: String, id: u32 },
36    #[error("Couldn't read packet id")]
37    ReadPacketId { source: BufReadError },
38    #[error(transparent)]
39    Decompress {
40        #[from]
41        #[backtrace]
42        source: DecompressionError,
43    },
44    #[error(transparent)]
45    FrameSplitter {
46        #[from]
47        #[backtrace]
48        source: FrameSplitterError,
49    },
50    #[error("Leftover data after reading packet {packet_name}: {data:?}")]
51    LeftoverData { data: Vec<u8>, packet_name: String },
52    #[error(transparent)]
53    IoError {
54        #[from]
55        #[backtrace]
56        source: std::io::Error,
57    },
58    #[error("Connection closed")]
59    ConnectionClosed,
60}
61
62#[derive(Error, Debug)]
63pub enum FrameSplitterError {
64    #[error("Couldn't read VarInt length for packet. The previous packet may have been corrupted")]
65    LengthRead {
66        #[from]
67        source: BufReadError,
68    },
69    #[error("Io error")]
70    Io {
71        #[from]
72        #[backtrace]
73        source: std::io::Error,
74    },
75    #[error("Packet is longer than {max} bytes (is {size})")]
76    BadLength { max: usize, size: usize },
77    #[error("Connection reset by peer")]
78    ConnectionReset,
79    #[error("Connection closed")]
80    ConnectionClosed,
81}
82
83/// Read a length, then read that amount of bytes from the `Cursor<Vec<u8>>`. If
84/// there's not enough data, return None
85fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Result<Box<[u8]>, FrameSplitterError> {
86    // copy the buffer first and read from the copy, then once we make sure
87    // the packet is all good we read it fully
88    let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
89    // Packet Length
90    let length = match u32::azalea_read_var(&mut buffer_copy) {
91        Ok(length) => length as usize,
92        Err(err) => match err {
93            BufReadError::Io { source } => return Err(FrameSplitterError::Io { source }),
94            _ => return Err(err.into()),
95        },
96    };
97
98    if length > buffer_copy.remaining() {
99        return Err(FrameSplitterError::BadLength {
100            max: buffer_copy.remaining(),
101            size: length,
102        });
103    }
104
105    // we read from the copy and we know it's legit, so we can take those bytes
106    // from the real buffer now
107
108    // the length of the varint that says the length of the whole packet
109    let varint_length = buffer.remaining() - buffer_copy.remaining();
110
111    buffer.advance(varint_length);
112    let data =
113        buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
114    buffer.advance(length);
115
116    if buffer.position() == buffer.get_ref().len() as u64 {
117        // reset the inner vec once we've reached the end of the buffer so we don't keep
118        // leaking memory
119        buffer.get_mut().clear();
120
121        // we just cap the capacity to 64KB instead of resetting it to save some
122        // allocations.
123        // and the reason we bother capping it at all is to avoid wasting memory if we
124        // get a big packet once and then never again.
125        buffer.get_mut().shrink_to(1024 * 64);
126
127        buffer.set_position(0);
128    }
129
130    Ok(data.into_boxed_slice())
131}
132
133fn frame_splitter(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Box<[u8]>>, FrameSplitterError> {
134    // https://tokio.rs/tokio/tutorial/framing
135    let read_frame = parse_frame(buffer);
136    match read_frame {
137        Ok(frame) => return Ok(Some(frame)),
138        Err(err) => match err {
139            FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
140                // we probably just haven't read enough yet
141            }
142            _ => return Err(err),
143        },
144    }
145
146    Ok(None)
147}
148
149pub fn deserialize_packet<P: ProtocolPacket + Debug>(
150    stream: &mut Cursor<&[u8]>,
151) -> Result<P, Box<ReadPacketError>> {
152    // Packet ID
153    let packet_id =
154        u32::azalea_read_var(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
155    P::read(packet_id, stream)
156}
157
158// this is always true in multiplayer, false in singleplayer
159static VALIDATE_DECOMPRESSED: bool = true;
160
161pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 2_097_152;
162
163#[derive(Error, Debug)]
164pub enum DecompressionError {
165    #[error("Couldn't read VarInt length for data")]
166    LengthReadError {
167        #[from]
168        source: BufReadError,
169    },
170    #[error("Io error")]
171    Io {
172        #[from]
173        #[backtrace]
174        source: std::io::Error,
175    },
176    #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
177    BelowCompressionThreshold { size: u32, threshold: u32 },
178    #[error(
179        "Badly compressed packet - size of {size} is larger than protocol maximum of {maximum}"
180    )]
181    AboveCompressionThreshold { size: u32, maximum: u32 },
182}
183
184/// Get the decompressed bytes from a packet. It must have been decrypted
185/// first.
186pub fn compression_decoder(
187    stream: &mut Cursor<&[u8]>,
188    compression_threshold: u32,
189) -> Result<Box<[u8]>, DecompressionError> {
190    // Data Length
191    let n = u32::azalea_read_var(stream)?;
192    if n == 0 {
193        // no data size, no compression
194        let buf = stream.get_ref()[stream.position() as usize..]
195            .to_vec()
196            .into_boxed_slice();
197        stream.set_position(stream.get_ref().len() as u64);
198        return Ok(buf);
199    }
200
201    if VALIDATE_DECOMPRESSED {
202        if n < compression_threshold {
203            return Err(DecompressionError::BelowCompressionThreshold {
204                size: n,
205                threshold: compression_threshold,
206            });
207        }
208        if n > MAXIMUM_UNCOMPRESSED_LENGTH {
209            return Err(DecompressionError::AboveCompressionThreshold {
210                size: n,
211                maximum: MAXIMUM_UNCOMPRESSED_LENGTH,
212            });
213        }
214    }
215
216    // VALIDATE_DECOMPRESSED should always be true, so the max they can make us
217    // allocate here is 2mb
218    let mut decoded_buf = Vec::with_capacity(n as usize);
219
220    let mut decoder = ZlibDecoder::new(stream);
221    decoder.read_to_end(&mut decoded_buf)?;
222
223    Ok(decoded_buf.into_boxed_slice())
224}
225
226/// Read a single packet from a stream.
227///
228/// The buffer is required because servers may send multiple packets in the
229/// same frame, so we need to store the packet data that's left to read.
230///
231/// The current protocol state must be passed as a generic.
232///
233/// For the non-waiting version, see [`try_read_packet`].
234pub async fn read_packet<P: ProtocolPacket + Debug, R>(
235    stream: &mut R,
236    buffer: &mut Cursor<Vec<u8>>,
237    compression_threshold: Option<u32>,
238    cipher: &mut Option<Aes128CfbDec>,
239) -> Result<P, Box<ReadPacketError>>
240where
241    R: AsyncRead + Unpin + Send + Sync,
242{
243    let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
244    let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
245    Ok(packet)
246}
247
248/// Try to read a single packet from a stream. Returns None if we haven't
249/// received a full packet yet.
250pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
251    stream: &mut R,
252    buffer: &mut Cursor<Vec<u8>>,
253    compression_threshold: Option<u32>,
254    cipher: &mut Option<Aes128CfbDec>,
255) -> Result<Option<P>, Box<ReadPacketError>>
256where
257    R: AsyncRead + Unpin + Send + Sync,
258{
259    let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)?
260    else {
261        return Ok(None);
262    };
263    let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
264    Ok(Some(packet))
265}
266
267pub async fn read_raw_packet<R>(
268    stream: &mut R,
269    buffer: &mut Cursor<Vec<u8>>,
270    compression_threshold: Option<u32>,
271    // this has to be a &mut Option<T> instead of an Option<&mut T> because
272    // otherwise the borrow checker complains about the cipher being moved
273    cipher: &mut Option<Aes128CfbDec>,
274) -> Result<Box<[u8]>, Box<ReadPacketError>>
275where
276    R: AsyncRead + Unpin + Send + Sync,
277{
278    loop {
279        if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
280            // we got a full packet!!
281            return Ok(buf);
282        };
283
284        let bytes = read_and_decrypt_frame(stream, cipher).await?;
285        buffer.get_mut().extend_from_slice(&bytes);
286    }
287}
288pub fn try_read_raw_packet<R>(
289    stream: &mut R,
290    buffer: &mut Cursor<Vec<u8>>,
291    compression_threshold: Option<u32>,
292    cipher: &mut Option<Aes128CfbDec>,
293) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
294where
295    R: AsyncRead + Unpin + Send + Sync,
296{
297    loop {
298        if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
299            // we got a full packet!!
300            return Ok(Some(buf));
301        };
302        let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
303            // no data received
304            return Ok(None);
305        };
306        // we got some data, so add it to the buffer and try again
307        buffer.get_mut().extend_from_slice(&bytes);
308    }
309}
310
311async fn read_and_decrypt_frame<R>(
312    stream: &mut R,
313    cipher: &mut Option<Aes128CfbDec>,
314) -> Result<Box<[u8]>, Box<ReadPacketError>>
315where
316    R: AsyncRead + Unpin + Send + Sync,
317{
318    let mut framed = FramedRead::new(stream, BytesCodec::new());
319
320    let Some(message) = framed.next().await else {
321        return Err(Box::new(ReadPacketError::ConnectionClosed));
322    };
323    let bytes = message.map_err(ReadPacketError::from)?;
324
325    let mut bytes = bytes.to_vec().into_boxed_slice();
326
327    // decrypt if necessary
328    if let Some(cipher) = cipher {
329        azalea_crypto::decrypt_packet(cipher, &mut bytes);
330    }
331
332    Ok(bytes)
333}
334fn try_read_and_decrypt_frame<R>(
335    stream: &mut R,
336    cipher: &mut Option<Aes128CfbDec>,
337) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
338where
339    R: AsyncRead + Unpin + Send + Sync,
340{
341    let mut framed = FramedRead::new(stream, BytesCodec::new());
342
343    let Some(message) = future::block_on(future::poll_once(framed.next())) else {
344        // nothing yet
345        return Ok(None);
346    };
347    let Some(message) = message else {
348        return Err(Box::new(ReadPacketError::ConnectionClosed));
349    };
350    let bytes = message.map_err(ReadPacketError::from)?;
351    let mut bytes = bytes.to_vec().into_boxed_slice();
352
353    // decrypt if necessary
354    if let Some(cipher) = cipher {
355        azalea_crypto::decrypt_packet(cipher, &mut bytes);
356    }
357
358    Ok(Some(bytes))
359}
360
361pub fn read_raw_packet_from_buffer<R>(
362    buffer: &mut Cursor<Vec<u8>>,
363    compression_threshold: Option<u32>,
364) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
365where
366    R: AsyncRead + Unpin + Send + Sync,
367{
368    let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
369        // no full packet yet :(
370        return Ok(None);
371    };
372
373    if let Some(compression_threshold) = compression_threshold {
374        buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)
375            .map_err(ReadPacketError::from)?;
376    }
377
378    if tracing::enabled!(tracing::Level::TRACE) {
379        static DO_NOT_CUT_OFF_PACKET_LOGS: LazyLock<bool> = LazyLock::new(|| {
380            env::var("AZALEA_DO_NOT_CUT_OFF_PACKET_LOGS")
381                .map(|s| s == "1" || s == "true")
382                .unwrap_or(false)
383        });
384
385        let buf_string: String = {
386            if !*DO_NOT_CUT_OFF_PACKET_LOGS && buf.len() > 500 {
387                let cut_off_buf = &buf[..500];
388                format!("{cut_off_buf:?}...")
389            } else {
390                format!("{buf:?}")
391            }
392        };
393        trace!("Reading packet with bytes: {buf_string}");
394    };
395
396    Ok(Some(buf))
397}