azalea_client/plugins/
connection.rs

1use std::{fmt::Debug, io::Cursor, mem, sync::Arc};
2
3use azalea_crypto::Aes128CfbEnc;
4use azalea_protocol::{
5    connect::{RawReadConnection, RawWriteConnection},
6    packets::{
7        ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
8        game::ClientboundGamePacket, login::ClientboundLoginPacket,
9    },
10    read::{ReadPacketError, deserialize_packet},
11    write::serialize_packet,
12};
13use bevy_app::prelude::*;
14use bevy_ecs::prelude::*;
15use bevy_tasks::{IoTaskPool, futures_lite::future};
16use thiserror::Error;
17use tokio::{
18    io::AsyncWriteExt,
19    net::tcp::OwnedWriteHalf,
20    sync::mpsc::{self},
21};
22use tracing::{debug, error, info, trace};
23
24use super::packet::{
25    config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
26};
27use crate::packet::{config, game, login};
28
29pub struct ConnectionPlugin;
30impl Plugin for ConnectionPlugin {
31    fn build(&self, app: &mut App) {
32        app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
33    }
34}
35
36pub fn read_packets(ecs: &mut World) {
37    let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
38    let mut conn_query = ecs.query::<&mut RawConnection>();
39
40    let mut entities_handling_packets = Vec::new();
41    let mut entities_with_injected_packets = Vec::new();
42    for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
43        if !raw_conn.injected_clientbound_packets.is_empty() {
44            entities_with_injected_packets.push((
45                entity,
46                mem::take(&mut raw_conn.injected_clientbound_packets),
47            ));
48        }
49
50        if raw_conn.network.is_none() {
51            // no network connection, don't bother with the normal packet handling
52            continue;
53        }
54
55        entities_handling_packets.push(entity);
56    }
57
58    let mut queued_packet_events = QueuedPacketEvents::default();
59
60    // handle injected packets, see the comment on
61    // RawConnection::injected_clientbound_packets for more info
62    for (entity, raw_packets) in entities_with_injected_packets {
63        for raw_packet in raw_packets {
64            let conn = conn_query.get(ecs, entity).unwrap();
65            let state = conn.state;
66
67            trace!("Received injected packet with bytes: {raw_packet:?}");
68            if let Err(e) =
69                handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
70            {
71                error!("Error reading injected packet: {e}");
72            }
73        }
74    }
75
76    for entity in entities_handling_packets {
77        loop {
78            let mut conn = conn_query.get_mut(ecs, entity).unwrap();
79            let net_conn = conn.net_conn().unwrap();
80            let read_res = net_conn.reader.try_read();
81            let state = conn.state;
82            match read_res {
83                Ok(Some(raw_packet)) => {
84                    let raw_packet = Arc::<[u8]>::from(raw_packet);
85                    if let Err(e) = handle_raw_packet(
86                        ecs,
87                        &raw_packet,
88                        entity,
89                        state,
90                        &mut queued_packet_events,
91                    ) {
92                        error!("Error reading packet: {e}");
93                    }
94                }
95                Ok(None) => {
96                    // no packets available
97                    break;
98                }
99                Err(err) => {
100                    log_for_error(&err);
101
102                    if matches!(
103                        &*err,
104                        ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
105                    ) {
106                        info!("Server closed connection");
107                        // ungraceful disconnect :(
108                        conn.network = None;
109                        // setting this will make us send a DisconnectEvent
110                        conn.is_alive = false;
111                    }
112
113                    break;
114                }
115            }
116        }
117    }
118
119    queued_packet_events.send_events(ecs);
120}
121
122fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
123    for mut conn in conn_query.iter_mut() {
124        if let Some(net_conn) = &mut conn.network {
125            // this needs to be done at some point every update to make sure packets are
126            // actually sent to the network
127
128            if net_conn.poll_writer().is_some() {
129                // means the writer task ended
130                conn.network = None;
131                conn.is_alive = false;
132            }
133        }
134    }
135}
136
137#[derive(Default)]
138pub struct QueuedPacketEvents {
139    login: Vec<ReceiveLoginPacketEvent>,
140    config: Vec<ReceiveConfigPacketEvent>,
141    game: Vec<ReceiveGamePacketEvent>,
142}
143impl QueuedPacketEvents {
144    fn send_events(&mut self, ecs: &mut World) {
145        ecs.send_event_batch(self.login.drain(..));
146        ecs.send_event_batch(self.config.drain(..));
147        ecs.send_event_batch(self.game.drain(..));
148    }
149}
150
151fn log_for_error(error: &ReadPacketError) {
152    if !matches!(*error, ReadPacketError::ConnectionClosed) {
153        error!("Error reading packet from Client: {error:?}");
154    }
155}
156
157/// The client's connection to the server.
158#[derive(Component)]
159pub struct RawConnection {
160    /// The network connection to the server.
161    ///
162    /// This isn't guaranteed to be present, for example during the main packet
163    /// handlers or at all times during tests.
164    ///
165    /// You shouldn't rely on this. Instead, use the events for sending packets
166    /// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) /
167    /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent)
168    /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent).
169    ///
170    /// To check if we haven't disconnected from the server, use
171    /// [`Self::is_alive`].
172    network: Option<NetworkConnection>,
173    pub state: ConnectionProtocol,
174    is_alive: bool,
175
176    /// This exists for internal testing purposes and probably shouldn't be used
177    /// for normal bots. It's basically a way to make our client think it
178    /// received a packet from the server without needing to interact with the
179    /// network.
180    pub injected_clientbound_packets: Vec<Box<[u8]>>,
181}
182impl RawConnection {
183    pub fn new(
184        reader: RawReadConnection,
185        writer: RawWriteConnection,
186        state: ConnectionProtocol,
187    ) -> Self {
188        let task_pool = IoTaskPool::get();
189
190        let (network_packet_writer_tx, network_packet_writer_rx) =
191            mpsc::unbounded_channel::<Box<[u8]>>();
192
193        let writer_task =
194            task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
195
196        let mut conn = Self::new_networkless(state);
197        conn.network = Some(NetworkConnection {
198            reader,
199            enc_cipher: writer.enc_cipher,
200            network_packet_writer_tx,
201            writer_task,
202        });
203
204        conn
205    }
206
207    pub fn new_networkless(state: ConnectionProtocol) -> Self {
208        Self {
209            network: None,
210            state,
211            is_alive: true,
212            injected_clientbound_packets: Vec::new(),
213        }
214    }
215
216    pub fn is_alive(&self) -> bool {
217        self.is_alive
218    }
219
220    /// Write a packet to the server without emitting any events.
221    ///
222    /// This is called by the handlers for [`SendPacketEvent`],
223    /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`].
224    ///
225    /// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent
226    /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent
227    /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent
228    pub fn write<P: ProtocolPacket + Debug>(
229        &mut self,
230        packet: impl Packet<P>,
231    ) -> Result<(), WritePacketError> {
232        if let Some(network) = &mut self.network {
233            network.write(packet)?;
234        } else {
235            debug!(
236                "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
237            );
238        }
239        Ok(())
240    }
241
242    pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
243        self.network.as_mut()
244    }
245}
246
247pub fn handle_raw_packet(
248    ecs: &mut World,
249    raw_packet: &[u8],
250    entity: Entity,
251    state: ConnectionProtocol,
252    queued_packet_events: &mut QueuedPacketEvents,
253) -> Result<(), Box<ReadPacketError>> {
254    let stream = &mut Cursor::new(raw_packet);
255    match state {
256        ConnectionProtocol::Handshake => {
257            unreachable!()
258        }
259        ConnectionProtocol::Game => {
260            let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
261            trace!("Packet: {packet:?}");
262            game::process_packet(ecs, entity, packet.as_ref());
263            queued_packet_events
264                .game
265                .push(ReceiveGamePacketEvent { entity, packet });
266        }
267        ConnectionProtocol::Status => {
268            unreachable!()
269        }
270        ConnectionProtocol::Login => {
271            let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
272            trace!("Packet: {packet:?}");
273            login::process_packet(ecs, entity, &packet);
274            queued_packet_events
275                .login
276                .push(ReceiveLoginPacketEvent { entity, packet });
277        }
278        ConnectionProtocol::Configuration => {
279            let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
280            trace!("Packet: {packet:?}");
281            config::process_packet(ecs, entity, &packet);
282            queued_packet_events
283                .config
284                .push(ReceiveConfigPacketEvent { entity, packet });
285        }
286    };
287
288    Ok(())
289}
290
291pub struct NetworkConnection {
292    reader: RawReadConnection,
293    // compression threshold is in the RawReadConnection
294    pub enc_cipher: Option<Aes128CfbEnc>,
295
296    pub writer_task: bevy_tasks::Task<()>,
297    /// A queue of raw TCP packets to send. These will not be modified further,
298    /// they should already be serialized and encrypted and everything before
299    /// being added here.
300    network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
301}
302impl NetworkConnection {
303    pub fn write<P: ProtocolPacket + Debug>(
304        &mut self,
305        packet: impl Packet<P>,
306    ) -> Result<(), WritePacketError> {
307        let packet = packet.into_variant();
308        let raw_packet = serialize_packet(&packet)?;
309        self.write_raw(&raw_packet)?;
310
311        Ok(())
312    }
313
314    pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
315        let network_packet = azalea_protocol::write::encode_to_network_packet(
316            raw_packet,
317            self.reader.compression_threshold,
318            &mut self.enc_cipher,
319        );
320        self.network_packet_writer_tx
321            .send(network_packet.into_boxed_slice())?;
322        Ok(())
323    }
324
325    /// Makes sure packets get sent and returns Some(()) if the connection has
326    /// closed.
327    pub fn poll_writer(&mut self) -> Option<()> {
328        let poll_once_res = future::poll_once(&mut self.writer_task);
329        future::block_on(poll_once_res)
330    }
331
332    pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
333        trace!("Set compression threshold to {threshold:?}");
334        self.reader.compression_threshold = threshold;
335    }
336    /// Set the encryption key that is used to encrypt and decrypt packets. It's
337    /// the same for both reading and writing.
338    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
339        trace!("Enabled protocol encryption");
340        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
341        self.reader.dec_cipher = Some(dec_cipher);
342        self.enc_cipher = Some(enc_cipher);
343    }
344}
345
346async fn write_task(
347    mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
348    mut write_half: OwnedWriteHalf,
349) {
350    while let Some(network_packet) = network_packet_writer_rx.recv().await {
351        if let Err(e) = write_half.write_all(&network_packet).await {
352            debug!("Error writing packet to server: {e}");
353            break;
354        };
355    }
356
357    trace!("write task is done");
358}
359
360#[derive(Error, Debug)]
361pub enum WritePacketError {
362    #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
363    WrongState {
364        expected: ConnectionProtocol,
365        got: ConnectionProtocol,
366    },
367    #[error(transparent)]
368    Encoding(#[from] azalea_protocol::write::PacketEncodeError),
369    #[error(transparent)]
370    SendError {
371        #[from]
372        #[backtrace]
373        source: mpsc::error::SendError<Box<[u8]>>,
374    },
375}