azalea_protocol/
read.rs

1//! Read packets from a stream.
2
3use std::{
4    backtrace::Backtrace,
5    env,
6    fmt::Debug,
7    io,
8    io::{Cursor, Read},
9    sync::LazyLock,
10};
11
12use azalea_buf::{AzaleaReadVar, BufReadError};
13use azalea_crypto::Aes128CfbDec;
14use flate2::read::ZlibDecoder;
15use futures::StreamExt;
16use futures_lite::future;
17use thiserror::Error;
18use tokio::io::AsyncRead;
19use tokio_util::{
20    bytes::Buf,
21    codec::{BytesCodec, FramedRead},
22};
23use tracing::trace;
24
25use crate::packets::ProtocolPacket;
26
27#[derive(Error, Debug)]
28pub enum ReadPacketError {
29    #[error("Error reading packet {packet_name} (id {packet_id}): {source}")]
30    Parse {
31        packet_id: u32,
32        packet_name: String,
33        backtrace: Box<Backtrace>,
34        source: BufReadError,
35    },
36    #[error("Unknown packet id {id} in state {state_name}")]
37    UnknownPacketId { state_name: String, id: u32 },
38    #[error("Couldn't read packet id")]
39    ReadPacketId { source: BufReadError },
40    #[error(transparent)]
41    Decompress {
42        #[from]
43        #[backtrace]
44        source: DecompressionError,
45    },
46    #[error(transparent)]
47    FrameSplitter {
48        #[from]
49        #[backtrace]
50        source: FrameSplitterError,
51    },
52    #[error("Leftover data after reading packet {packet_name}: {data:?}")]
53    LeftoverData { data: Vec<u8>, packet_name: String },
54    #[error(transparent)]
55    IoError {
56        #[from]
57        #[backtrace]
58        source: io::Error,
59    },
60    #[error("Connection closed")]
61    ConnectionClosed,
62}
63
64#[derive(Error, Debug)]
65pub enum FrameSplitterError {
66    #[error("Couldn't read VarInt length for packet. The previous packet may have been corrupted")]
67    LengthRead {
68        #[from]
69        source: BufReadError,
70    },
71    #[error("Io error")]
72    Io {
73        #[from]
74        #[backtrace]
75        source: io::Error,
76    },
77    #[error("Packet is longer than {max} bytes (is {size})")]
78    BadLength { max: usize, size: usize },
79    #[error("Connection reset by peer")]
80    ConnectionReset,
81    #[error("Connection closed")]
82    ConnectionClosed,
83}
84
85/// Read a length, then read that amount of bytes from the `Cursor<Vec<u8>>`. If
86/// there's not enough data, return None
87fn parse_frame(buffer: &mut Cursor<Vec<u8>>) -> Result<Box<[u8]>, FrameSplitterError> {
88    // copy the buffer first and read from the copy, then once we make sure
89    // the packet is all good we read it fully
90    let mut buffer_copy = Cursor::new(&buffer.get_ref()[buffer.position() as usize..]);
91    // Packet Length
92    let length = match u32::azalea_read_var(&mut buffer_copy) {
93        Ok(length) => length as usize,
94        Err(err) => match err {
95            BufReadError::Io { source } => return Err(FrameSplitterError::Io { source }),
96            _ => return Err(err.into()),
97        },
98    };
99
100    if length > buffer_copy.remaining() {
101        return Err(FrameSplitterError::BadLength {
102            max: buffer_copy.remaining(),
103            size: length,
104        });
105    }
106
107    // we read from the copy and we know it's legit, so we can take those bytes
108    // from the real buffer now
109
110    // the length of the varint that says the length of the whole packet
111    let varint_length = buffer.remaining() - buffer_copy.remaining();
112
113    buffer.advance(varint_length);
114    let data =
115        buffer.get_ref()[buffer.position() as usize..buffer.position() as usize + length].to_vec();
116    buffer.advance(length);
117
118    if buffer.position() == buffer.get_ref().len() as u64 {
119        // reset the inner vec once we've reached the end of the buffer so we don't keep
120        // leaking memory
121        buffer.get_mut().clear();
122
123        // we just cap the capacity to 64KB instead of resetting it to save some
124        // allocations.
125        // and the reason we bother capping it at all is to avoid wasting memory if we
126        // get a big packet once and then never again.
127        buffer.get_mut().shrink_to(1024 * 64);
128
129        buffer.set_position(0);
130    }
131
132    Ok(data.into_boxed_slice())
133}
134
135fn frame_splitter(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Box<[u8]>>, FrameSplitterError> {
136    // https://tokio.rs/tokio/tutorial/framing
137    let read_frame = parse_frame(buffer);
138    match read_frame {
139        Ok(frame) => return Ok(Some(frame)),
140        Err(err) => match err {
141            FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
142                // we probably just haven't read enough yet
143            }
144            _ => return Err(err),
145        },
146    }
147
148    Ok(None)
149}
150
151pub fn deserialize_packet<P: ProtocolPacket + Debug>(
152    stream: &mut Cursor<&[u8]>,
153) -> Result<P, Box<ReadPacketError>> {
154    // Packet ID
155    let packet_id =
156        u32::azalea_read_var(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
157    P::read(packet_id, stream)
158}
159
160// this is always true in multiplayer, false in singleplayer
161static VALIDATE_DECOMPRESSED: bool = true;
162
163pub static MAXIMUM_UNCOMPRESSED_LENGTH: u32 = 8_388_608;
164
165#[derive(Error, Debug)]
166pub enum DecompressionError {
167    #[error("Couldn't read VarInt length for data")]
168    LengthReadError {
169        #[from]
170        source: BufReadError,
171    },
172    #[error("Io error")]
173    Io {
174        #[from]
175        #[backtrace]
176        source: io::Error,
177    },
178    #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
179    BelowCompressionThreshold { size: u32, threshold: u32 },
180    #[error(
181        "Badly compressed packet - size of {size} is larger than protocol maximum of {maximum}"
182    )]
183    AboveCompressionThreshold { size: u32, maximum: u32 },
184}
185
186/// Get the decompressed bytes from a packet. It must have been decrypted
187/// first.
188pub fn compression_decoder(
189    stream: &mut Cursor<&[u8]>,
190    compression_threshold: u32,
191) -> Result<Box<[u8]>, DecompressionError> {
192    // Data Length
193    let n = u32::azalea_read_var(stream)?;
194    if n == 0 {
195        // no data size, no compression
196        let buf = stream.get_ref()[stream.position() as usize..]
197            .to_vec()
198            .into_boxed_slice();
199        stream.set_position(stream.get_ref().len() as u64);
200        return Ok(buf);
201    }
202
203    if VALIDATE_DECOMPRESSED {
204        if n < compression_threshold {
205            return Err(DecompressionError::BelowCompressionThreshold {
206                size: n,
207                threshold: compression_threshold,
208            });
209        }
210        if n > MAXIMUM_UNCOMPRESSED_LENGTH {
211            return Err(DecompressionError::AboveCompressionThreshold {
212                size: n,
213                maximum: MAXIMUM_UNCOMPRESSED_LENGTH,
214            });
215        }
216    }
217
218    // VALIDATE_DECOMPRESSED should always be true, so the max they can make us
219    // allocate here is 2mb
220    let mut decoded_buf = Vec::with_capacity(n as usize);
221
222    let mut decoder = ZlibDecoder::new(stream);
223    decoder.read_to_end(&mut decoded_buf)?;
224
225    Ok(decoded_buf.into_boxed_slice())
226}
227
228/// Read a single packet from a stream.
229///
230/// The buffer is required because servers may send multiple packets in the
231/// same frame, so we need to store the packet data that's left to read.
232///
233/// The current protocol state must be passed as a generic.
234///
235/// For the non-waiting version, see [`try_read_packet`].
236pub async fn read_packet<P: ProtocolPacket + Debug, R>(
237    stream: &mut R,
238    buffer: &mut Cursor<Vec<u8>>,
239    compression_threshold: Option<u32>,
240    cipher: &mut Option<Aes128CfbDec>,
241) -> Result<P, Box<ReadPacketError>>
242where
243    R: AsyncRead + Unpin + Send + Sync,
244{
245    let raw_packet = read_raw_packet(stream, buffer, compression_threshold, cipher).await?;
246    let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
247    Ok(packet)
248}
249
250/// Try to read a single packet from a stream. Returns None if we haven't
251/// received a full packet yet.
252pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
253    stream: &mut R,
254    buffer: &mut Cursor<Vec<u8>>,
255    compression_threshold: Option<u32>,
256    cipher: &mut Option<Aes128CfbDec>,
257) -> Result<Option<P>, Box<ReadPacketError>>
258where
259    R: AsyncRead + Unpin + Send + Sync,
260{
261    let Some(raw_packet) = try_read_raw_packet(stream, buffer, compression_threshold, cipher)?
262    else {
263        return Ok(None);
264    };
265    let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
266    Ok(Some(packet))
267}
268
269pub async fn read_raw_packet<R>(
270    stream: &mut R,
271    buffer: &mut Cursor<Vec<u8>>,
272    compression_threshold: Option<u32>,
273    // this has to be a &mut Option<T> instead of an Option<&mut T> because
274    // otherwise the borrow checker complains about the cipher being moved
275    cipher: &mut Option<Aes128CfbDec>,
276) -> Result<Box<[u8]>, Box<ReadPacketError>>
277where
278    R: AsyncRead + Unpin + Send + Sync,
279{
280    loop {
281        if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
282            // we got a full packet!!
283            return Ok(buf);
284        };
285
286        let bytes = read_and_decrypt_frame(stream, cipher).await?;
287        buffer.get_mut().extend_from_slice(&bytes);
288    }
289}
290/// Read a packet from the stream, then if necessary decrypt it, decompress
291/// it, and split it.
292pub fn try_read_raw_packet<R>(
293    stream: &mut R,
294    buffer: &mut Cursor<Vec<u8>>,
295    compression_threshold: Option<u32>,
296    cipher: &mut Option<Aes128CfbDec>,
297) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
298where
299    R: AsyncRead + Unpin + Send + Sync,
300{
301    loop {
302        if let Some(buf) = read_raw_packet_from_buffer::<R>(buffer, compression_threshold)? {
303            // we got a full packet!!
304            return Ok(Some(buf));
305        };
306        let Some(bytes) = try_read_and_decrypt_frame(stream, cipher)? else {
307            // no data received
308            return Ok(None);
309        };
310        // we got some data, so add it to the buffer and try again
311        buffer.get_mut().extend_from_slice(&bytes);
312    }
313}
314
315async fn read_and_decrypt_frame<R>(
316    stream: &mut R,
317    cipher: &mut Option<Aes128CfbDec>,
318) -> Result<Box<[u8]>, Box<ReadPacketError>>
319where
320    R: AsyncRead + Unpin + Send + Sync,
321{
322    let mut framed = FramedRead::new(stream, BytesCodec::new());
323
324    let Some(message) = framed.next().await else {
325        return Err(Box::new(ReadPacketError::ConnectionClosed));
326    };
327    let bytes = message.map_err(ReadPacketError::from)?;
328
329    let mut bytes = bytes.to_vec().into_boxed_slice();
330
331    // decrypt if necessary
332    if let Some(cipher) = cipher {
333        azalea_crypto::decrypt_packet(cipher, &mut bytes);
334    }
335
336    Ok(bytes)
337}
338fn try_read_and_decrypt_frame<R>(
339    stream: &mut R,
340    cipher: &mut Option<Aes128CfbDec>,
341) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
342where
343    R: AsyncRead + Unpin + Send + Sync,
344{
345    let mut framed = FramedRead::new(stream, BytesCodec::new());
346
347    let Some(message) = future::block_on(future::poll_once(framed.next())) else {
348        // nothing yet
349        return Ok(None);
350    };
351    let Some(message) = message else {
352        return Err(Box::new(ReadPacketError::ConnectionClosed));
353    };
354    let bytes = message.map_err(ReadPacketError::from)?;
355    let mut bytes = bytes.to_vec().into_boxed_slice();
356
357    // decrypt if necessary
358    if let Some(cipher) = cipher {
359        azalea_crypto::decrypt_packet(cipher, &mut bytes);
360    }
361
362    Ok(Some(bytes))
363}
364
365pub fn read_raw_packet_from_buffer<R>(
366    buffer: &mut Cursor<Vec<u8>>,
367    compression_threshold: Option<u32>,
368) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>>
369where
370    R: AsyncRead + Unpin + Send + Sync,
371{
372    let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
373        // no full packet yet :(
374        return Ok(None);
375    };
376
377    if let Some(compression_threshold) = compression_threshold {
378        buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)
379            .map_err(ReadPacketError::from)?;
380    }
381
382    if tracing::enabled!(tracing::Level::TRACE) {
383        static DO_NOT_CUT_OFF_PACKET_LOGS: LazyLock<bool> = LazyLock::new(|| {
384            env::var("AZALEA_DO_NOT_CUT_OFF_PACKET_LOGS")
385                .map(|s| s == "1" || s == "true")
386                .unwrap_or(false)
387        });
388
389        let buf_string: String = {
390            if !*DO_NOT_CUT_OFF_PACKET_LOGS && buf.len() > 500 {
391                let cut_off_buf = &buf[..500];
392                format!("{cut_off_buf:?}...")
393            } else {
394                format!("{buf:?}")
395            }
396        };
397        trace!("Reading packet with bytes: {buf_string}");
398    };
399
400    Ok(Some(buf))
401}