azalea_client/plugins/
connection.rs

1use std::{
2    fmt::Debug,
3    io::Cursor,
4    mem,
5    sync::{
6        Arc,
7        atomic::{self, AtomicBool},
8    },
9};
10
11use azalea_crypto::Aes128CfbEnc;
12use azalea_protocol::{
13    connect::{RawReadConnection, RawWriteConnection},
14    packets::{
15        ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
16        game::ClientboundGamePacket, login::ClientboundLoginPacket,
17    },
18    read::{ReadPacketError, deserialize_packet},
19    write::serialize_packet,
20};
21use bevy_app::prelude::*;
22use bevy_ecs::prelude::*;
23use bevy_tasks::{IoTaskPool, futures_lite::future};
24use thiserror::Error;
25use tokio::{
26    io::AsyncWriteExt,
27    net::tcp::OwnedWriteHalf,
28    sync::mpsc::{self},
29};
30use tracing::{debug, error, info, trace};
31
32use super::packet::{
33    config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
34};
35use crate::packet::{config, game, login};
36
37pub struct ConnectionPlugin;
38impl Plugin for ConnectionPlugin {
39    fn build(&self, app: &mut App) {
40        app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
41    }
42}
43
44pub fn read_packets(ecs: &mut World) {
45    let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
46    let mut conn_query = ecs.query::<&mut RawConnection>();
47
48    let mut entities_handling_packets = Vec::new();
49    let mut entities_with_injected_packets = Vec::new();
50    for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
51        if !raw_conn.injected_clientbound_packets.is_empty() {
52            entities_with_injected_packets.push((
53                entity,
54                mem::take(&mut raw_conn.injected_clientbound_packets),
55            ));
56        }
57
58        if raw_conn.network.is_none() {
59            // no network connection, don't bother with the normal packet handling
60            continue;
61        }
62
63        entities_handling_packets.push(entity);
64    }
65
66    let mut queued_packet_events = QueuedPacketEvents::default();
67
68    // handle injected packets, see the comment on
69    // RawConnection::injected_clientbound_packets for more info
70    for (entity, raw_packets) in entities_with_injected_packets {
71        for raw_packet in raw_packets {
72            let conn = conn_query.get(ecs, entity).unwrap();
73            let state = conn.state;
74
75            trace!("Received injected packet with bytes: {raw_packet:?}");
76            if let Err(e) =
77                handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
78            {
79                error!("Error reading injected packet: {e}");
80            }
81        }
82    }
83
84    for entity in entities_handling_packets {
85        loop {
86            let mut conn = conn_query.get_mut(ecs, entity).unwrap();
87            let net_conn = conn.net_conn().unwrap();
88            let read_res = net_conn.reader.try_read();
89            let state = conn.state;
90            match read_res {
91                Ok(Some(raw_packet)) => {
92                    let raw_packet = Arc::<[u8]>::from(raw_packet);
93                    if let Err(e) = handle_raw_packet(
94                        ecs,
95                        &raw_packet,
96                        entity,
97                        state,
98                        &mut queued_packet_events,
99                    ) {
100                        error!("Error reading packet: {e}");
101                    }
102                }
103                Ok(None) => {
104                    // no packets available
105                    break;
106                }
107                Err(err) => {
108                    log_for_error(&err);
109
110                    if matches!(
111                        &*err,
112                        ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
113                    ) {
114                        info!("Server closed connection");
115                        // ungraceful disconnect :(
116                        conn.network = None;
117                        // setting this will make us send a DisconnectEvent
118                        conn.is_alive = false;
119                    }
120
121                    break;
122                }
123            }
124        }
125    }
126
127    queued_packet_events.write_messages(ecs);
128}
129
130fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
131    for mut conn in conn_query.iter_mut() {
132        if let Some(net_conn) = &mut conn.network {
133            // this needs to be done at some point every update to make sure packets are
134            // actually sent to the network
135
136            if net_conn.poll_writer().is_some() {
137                // means the writer task ended
138                conn.network = None;
139                conn.is_alive = false;
140            }
141        }
142    }
143}
144
145#[derive(Default)]
146pub struct QueuedPacketEvents {
147    login: Vec<ReceiveLoginPacketEvent>,
148    config: Vec<ReceiveConfigPacketEvent>,
149    game: Vec<ReceiveGamePacketEvent>,
150}
151impl QueuedPacketEvents {
152    fn write_messages(&mut self, ecs: &mut World) {
153        ecs.write_message_batch(self.login.drain(..));
154        ecs.write_message_batch(self.config.drain(..));
155        ecs.write_message_batch(self.game.drain(..));
156    }
157}
158
159fn log_for_error(error: &ReadPacketError) {
160    if !matches!(*error, ReadPacketError::ConnectionClosed) {
161        error!("Error reading packet from Client: {error:?}");
162    }
163}
164
165/// The client's connection to the server.
166#[derive(Component)]
167pub struct RawConnection {
168    /// The network connection to the server.
169    ///
170    /// This isn't guaranteed to be present, for example during the main packet
171    /// handlers or at all times during tests.
172    ///
173    /// You shouldn't rely on this. Instead, use the events for sending packets
174    /// like [`SendGamePacketEvent`](crate::packet::game::SendGamePacketEvent) /
175    /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent)
176    /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent).
177    ///
178    /// To check if we haven't disconnected from the server, use
179    /// [`Self::is_alive`].
180    pub(crate) network: Option<NetworkConnection>,
181    pub state: ConnectionProtocol,
182    pub(crate) is_alive: bool,
183
184    /// This exists for internal testing purposes and probably shouldn't be used
185    /// for normal bots.
186    ///
187    /// It's basically a way to make our client think it received a packet from
188    /// the server without needing to interact with the network.
189    pub injected_clientbound_packets: Vec<Box<[u8]>>,
190}
191impl RawConnection {
192    pub fn new(
193        reader: RawReadConnection,
194        writer: RawWriteConnection,
195        state: ConnectionProtocol,
196    ) -> Self {
197        let task_pool = IoTaskPool::get();
198
199        let (network_packet_writer_tx, network_packet_writer_rx) =
200            mpsc::unbounded_channel::<Box<[u8]>>();
201
202        let writer_task =
203            task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
204
205        let mut conn = Self::new_networkless(state);
206        conn.network = Some(NetworkConnection {
207            reader,
208            enc_cipher: writer.enc_cipher,
209            network_packet_writer_tx,
210            writer_task,
211        });
212
213        conn
214    }
215
216    pub fn new_networkless(state: ConnectionProtocol) -> Self {
217        Self {
218            network: None,
219            state,
220            is_alive: true,
221            injected_clientbound_packets: Vec::new(),
222        }
223    }
224
225    pub fn is_alive(&self) -> bool {
226        self.is_alive
227    }
228
229    /// Write a packet to the server without emitting any events.
230    ///
231    /// This is called by the handlers for [`SendGamePacketEvent`],
232    /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`].
233    ///
234    /// [`SendGamePacketEvent`]: crate::packet::game::SendGamePacketEvent
235    /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent
236    /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent
237    pub fn write<P: ProtocolPacket + Debug>(
238        &mut self,
239        packet: impl Packet<P>,
240    ) -> Result<(), WritePacketError> {
241        if let Some(network) = &mut self.network {
242            network.write(packet)?;
243        } else {
244            static WARNED: AtomicBool = AtomicBool::new(false);
245            if !WARNED.swap(true, atomic::Ordering::Relaxed) {
246                debug!(
247                    "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
248                );
249            }
250        }
251        Ok(())
252    }
253
254    pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
255        self.network.as_mut()
256    }
257}
258
259pub fn handle_raw_packet(
260    ecs: &mut World,
261    raw_packet: &[u8],
262    entity: Entity,
263    state: ConnectionProtocol,
264    queued_packet_events: &mut QueuedPacketEvents,
265) -> Result<(), Box<ReadPacketError>> {
266    let stream = &mut Cursor::new(raw_packet);
267    match state {
268        ConnectionProtocol::Handshake => {
269            unreachable!()
270        }
271        ConnectionProtocol::Game => {
272            let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
273            trace!("Packet: {packet:?}");
274            game::process_packet(ecs, entity, packet.as_ref());
275            queued_packet_events
276                .game
277                .push(ReceiveGamePacketEvent { entity, packet });
278        }
279        ConnectionProtocol::Status => {
280            unreachable!()
281        }
282        ConnectionProtocol::Login => {
283            let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
284            trace!("Packet: {packet:?}");
285            login::process_packet(ecs, entity, &packet);
286            queued_packet_events
287                .login
288                .push(ReceiveLoginPacketEvent { entity, packet });
289        }
290        ConnectionProtocol::Configuration => {
291            let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
292            trace!("Packet: {packet:?}");
293            config::process_packet(ecs, entity, &packet);
294            queued_packet_events
295                .config
296                .push(ReceiveConfigPacketEvent { entity, packet });
297        }
298    };
299
300    Ok(())
301}
302
303pub struct NetworkConnection {
304    reader: RawReadConnection,
305    // compression threshold is in the RawReadConnection
306    pub enc_cipher: Option<Aes128CfbEnc>,
307
308    pub writer_task: bevy_tasks::Task<()>,
309    /// A queue of raw TCP packets to send.
310    ///
311    /// These will not be modified further, they should already be serialized
312    /// and compressed and encrypted before being added here.
313    network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
314}
315impl NetworkConnection {
316    pub fn write<P: ProtocolPacket + Debug>(
317        &mut self,
318        packet: impl Packet<P>,
319    ) -> Result<(), WritePacketError> {
320        let packet = packet.into_variant();
321        let raw_packet = serialize_packet(&packet)?;
322        self.write_raw(&raw_packet)?;
323
324        Ok(())
325    }
326
327    pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
328        let network_packet = azalea_protocol::write::encode_to_network_packet(
329            raw_packet,
330            self.reader.compression_threshold,
331            &mut self.enc_cipher,
332        );
333        self.network_packet_writer_tx
334            .send(network_packet.into_boxed_slice())?;
335        Ok(())
336    }
337
338    /// Makes sure packets get sent and returns Some(()) if the connection has
339    /// closed.
340    pub fn poll_writer(&mut self) -> Option<()> {
341        let poll_once_res = future::poll_once(&mut self.writer_task);
342        future::block_on(poll_once_res)
343    }
344
345    pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
346        trace!("Set compression threshold to {threshold:?}");
347        self.reader.compression_threshold = threshold;
348    }
349    /// Set the encryption key that is used to encrypt and decrypt packets.
350    ///
351    /// The same key is used for both reading and writing.
352    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
353        trace!("Enabled protocol encryption");
354        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
355        self.reader.dec_cipher = Some(dec_cipher);
356        self.enc_cipher = Some(enc_cipher);
357    }
358}
359
360async fn write_task(
361    mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
362    mut write_half: OwnedWriteHalf,
363) {
364    while let Some(network_packet) = network_packet_writer_rx.recv().await {
365        if let Err(e) = write_half.write_all(&network_packet).await {
366            debug!("Error writing packet to server: {e}");
367            break;
368        };
369    }
370
371    trace!("write task is done");
372}
373
374#[derive(Error, Debug)]
375pub enum WritePacketError {
376    #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
377    WrongState {
378        expected: ConnectionProtocol,
379        got: ConnectionProtocol,
380    },
381    #[error(transparent)]
382    Encoding(#[from] azalea_protocol::write::PacketEncodeError),
383    #[error(transparent)]
384    SendError {
385        #[from]
386        #[backtrace]
387        source: mpsc::error::SendError<Box<[u8]>>,
388    },
389}