azalea_client/plugins/
connection.rs

1use std::{fmt::Debug, io::Cursor, mem, sync::Arc};
2
3use azalea_crypto::Aes128CfbEnc;
4use azalea_protocol::{
5    connect::{RawReadConnection, RawWriteConnection},
6    packets::{
7        ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
8        game::ClientboundGamePacket, login::ClientboundLoginPacket,
9    },
10    read::{ReadPacketError, deserialize_packet},
11    write::serialize_packet,
12};
13use bevy_app::prelude::*;
14use bevy_ecs::prelude::*;
15use bevy_tasks::{IoTaskPool, futures_lite::future};
16use thiserror::Error;
17use tokio::{
18    io::AsyncWriteExt,
19    net::tcp::OwnedWriteHalf,
20    sync::mpsc::{self},
21};
22use tracing::{debug, error, info, trace};
23
24use super::packet::{
25    config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
26};
27use crate::packet::{config, game, login};
28
29pub struct ConnectionPlugin;
30impl Plugin for ConnectionPlugin {
31    fn build(&self, app: &mut App) {
32        app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
33    }
34}
35
36pub fn read_packets(ecs: &mut World) {
37    // receive_game_packet_events: EventWriter<ReceiveGamePacketEvent>,
38    let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
39    let mut conn_query = ecs.query::<&mut RawConnection>();
40
41    let mut entities_handling_packets = Vec::new();
42    let mut entities_with_injected_packets = Vec::new();
43    for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
44        if !raw_conn.injected_clientbound_packets.is_empty() {
45            entities_with_injected_packets.push((
46                entity,
47                mem::take(&mut raw_conn.injected_clientbound_packets),
48            ));
49        }
50
51        if raw_conn.network.is_none() {
52            // no network connection, don't bother with the normal packet handling
53            continue;
54        }
55
56        entities_handling_packets.push(entity);
57    }
58
59    let mut queued_packet_events = QueuedPacketEvents::default();
60
61    // handle injected packets, see the comment on
62    // RawConnection::injected_clientbound_packets for more info
63    for (entity, raw_packets) in entities_with_injected_packets {
64        for raw_packet in raw_packets {
65            let conn = conn_query.get(ecs, entity).unwrap();
66            let state = conn.state;
67
68            trace!("Received injected packet with bytes: {raw_packet:?}");
69            if let Err(e) =
70                handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
71            {
72                error!("Error reading injected packet: {e}");
73            }
74        }
75    }
76
77    for entity in entities_handling_packets {
78        loop {
79            let mut conn = conn_query.get_mut(ecs, entity).unwrap();
80            let net_conn = conn.net_conn().unwrap();
81            let read_res = net_conn.reader.try_read();
82            let state = conn.state;
83            match read_res {
84                Ok(Some(raw_packet)) => {
85                    let raw_packet = Arc::<[u8]>::from(raw_packet);
86                    if let Err(e) = handle_raw_packet(
87                        ecs,
88                        &raw_packet,
89                        entity,
90                        state,
91                        &mut queued_packet_events,
92                    ) {
93                        error!("Error reading packet: {e}");
94                    }
95                }
96                Ok(None) => {
97                    // no packets available
98                    break;
99                }
100                Err(err) => {
101                    log_for_error(&err);
102
103                    if matches!(
104                        &*err,
105                        ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
106                    ) {
107                        info!("Server closed connection");
108                        // ungraceful disconnect :(
109                        conn.network = None;
110                        // setting this will make us send a DisconnectEvent
111                        conn.is_alive = false;
112                    }
113
114                    break;
115                }
116            }
117        }
118    }
119
120    queued_packet_events.send_events(ecs);
121}
122
123fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
124    for mut conn in conn_query.iter_mut() {
125        if let Some(net_conn) = &mut conn.network {
126            // this needs to be done at some point every update to make sure packets are
127            // actually sent to the network
128
129            if net_conn.poll_writer().is_some() {
130                // means the writer task ended
131                conn.network = None;
132                conn.is_alive = false;
133            }
134        }
135    }
136}
137
138#[derive(Default)]
139pub struct QueuedPacketEvents {
140    login: Vec<ReceiveLoginPacketEvent>,
141    config: Vec<ReceiveConfigPacketEvent>,
142    game: Vec<ReceiveGamePacketEvent>,
143}
144impl QueuedPacketEvents {
145    fn send_events(&mut self, ecs: &mut World) {
146        ecs.send_event_batch(self.login.drain(..));
147        ecs.send_event_batch(self.config.drain(..));
148        ecs.send_event_batch(self.game.drain(..));
149    }
150}
151
152fn log_for_error(error: &ReadPacketError) {
153    if !matches!(*error, ReadPacketError::ConnectionClosed) {
154        error!("Error reading packet from Client: {error:?}");
155    }
156}
157
158/// The client's connection to the server.
159#[derive(Component)]
160pub struct RawConnection {
161    /// The network connection to the server.
162    ///
163    /// This isn't guaranteed to be present, for example during the main packet
164    /// handlers or at all times during tests.
165    ///
166    /// You shouldn't rely on this. Instead, use the events for sending packets
167    /// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) /
168    /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent)
169    /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent).
170    ///
171    /// To check if we haven't disconnected from the server, use
172    /// [`Self::is_alive`].
173    network: Option<NetworkConnection>,
174    pub state: ConnectionProtocol,
175    is_alive: bool,
176
177    /// This exists for internal testing purposes and probably shouldn't be used
178    /// for normal bots. It's basically a way to make our client think it
179    /// received a packet from the server without needing to interact with the
180    /// network.
181    pub injected_clientbound_packets: Vec<Box<[u8]>>,
182}
183impl RawConnection {
184    pub fn new(
185        reader: RawReadConnection,
186        writer: RawWriteConnection,
187        state: ConnectionProtocol,
188    ) -> Self {
189        let task_pool = IoTaskPool::get();
190
191        let (network_packet_writer_tx, network_packet_writer_rx) =
192            mpsc::unbounded_channel::<Box<[u8]>>();
193
194        let writer_task =
195            task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
196
197        let mut conn = Self::new_networkless(state);
198        conn.network = Some(NetworkConnection {
199            reader,
200            enc_cipher: writer.enc_cipher,
201            network_packet_writer_tx,
202            writer_task,
203        });
204
205        conn
206    }
207
208    pub fn new_networkless(state: ConnectionProtocol) -> Self {
209        Self {
210            network: None,
211            state,
212            is_alive: true,
213            injected_clientbound_packets: Vec::new(),
214        }
215    }
216
217    pub fn is_alive(&self) -> bool {
218        self.is_alive
219    }
220
221    /// Write a packet to the server without emitting any events.
222    ///
223    /// This is called by the handlers for [`SendPacketEvent`],
224    /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`].
225    ///
226    /// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent
227    /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent
228    /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent
229    pub fn write<P: ProtocolPacket + Debug>(
230        &mut self,
231        packet: impl Packet<P>,
232    ) -> Result<(), WritePacketError> {
233        if let Some(network) = &mut self.network {
234            network.write(packet)?;
235        } else {
236            debug!(
237                "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
238            );
239        }
240        Ok(())
241    }
242
243    pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
244        self.network.as_mut()
245    }
246}
247
248pub fn handle_raw_packet(
249    ecs: &mut World,
250    raw_packet: &[u8],
251    entity: Entity,
252    state: ConnectionProtocol,
253    queued_packet_events: &mut QueuedPacketEvents,
254) -> Result<(), Box<ReadPacketError>> {
255    let stream = &mut Cursor::new(raw_packet);
256    match state {
257        ConnectionProtocol::Handshake => {
258            unreachable!()
259        }
260        ConnectionProtocol::Game => {
261            let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
262            trace!("Packet: {packet:?}");
263            game::process_packet(ecs, entity, packet.as_ref());
264            queued_packet_events
265                .game
266                .push(ReceiveGamePacketEvent { entity, packet });
267        }
268        ConnectionProtocol::Status => {
269            unreachable!()
270        }
271        ConnectionProtocol::Login => {
272            let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
273            trace!("Packet: {packet:?}");
274            login::process_packet(ecs, entity, &packet);
275            queued_packet_events
276                .login
277                .push(ReceiveLoginPacketEvent { entity, packet });
278        }
279        ConnectionProtocol::Configuration => {
280            let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
281            trace!("Packet: {packet:?}");
282            config::process_packet(ecs, entity, &packet);
283            queued_packet_events
284                .config
285                .push(ReceiveConfigPacketEvent { entity, packet });
286        }
287    };
288
289    Ok(())
290}
291
292pub struct NetworkConnection {
293    reader: RawReadConnection,
294    // compression threshold is in the RawReadConnection
295    pub enc_cipher: Option<Aes128CfbEnc>,
296
297    pub writer_task: bevy_tasks::Task<()>,
298    /// A queue of raw TCP packets to send. These will not be modified further,
299    /// they should already be serialized and encrypted and everything before
300    /// being added here.
301    network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
302}
303impl NetworkConnection {
304    pub fn write<P: ProtocolPacket + Debug>(
305        &mut self,
306        packet: impl Packet<P>,
307    ) -> Result<(), WritePacketError> {
308        let packet = packet.into_variant();
309        let raw_packet = serialize_packet(&packet)?;
310        self.write_raw(&raw_packet)?;
311
312        Ok(())
313    }
314
315    pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
316        let network_packet = azalea_protocol::write::encode_to_network_packet(
317            raw_packet,
318            self.reader.compression_threshold,
319            &mut self.enc_cipher,
320        );
321        self.network_packet_writer_tx
322            .send(network_packet.into_boxed_slice())?;
323        Ok(())
324    }
325
326    /// Makes sure packets get sent and returns Some(()) if the connection has
327    /// closed.
328    pub fn poll_writer(&mut self) -> Option<()> {
329        let poll_once_res = future::poll_once(&mut self.writer_task);
330        future::block_on(poll_once_res)
331    }
332
333    pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
334        trace!("Set compression threshold to {threshold:?}");
335        self.reader.compression_threshold = threshold;
336    }
337    /// Set the encryption key that is used to encrypt and decrypt packets. It's
338    /// the same for both reading and writing.
339    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
340        trace!("Enabled protocol encryption");
341        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
342        self.reader.dec_cipher = Some(dec_cipher);
343        self.enc_cipher = Some(enc_cipher);
344    }
345}
346
347async fn write_task(
348    mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
349    mut write_half: OwnedWriteHalf,
350) {
351    while let Some(network_packet) = network_packet_writer_rx.recv().await {
352        if let Err(e) = write_half.write_all(&network_packet).await {
353            debug!("Error writing packet to server: {e}");
354            break;
355        };
356    }
357
358    trace!("write task is done");
359}
360
361#[derive(Error, Debug)]
362pub enum WritePacketError {
363    #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
364    WrongState {
365        expected: ConnectionProtocol,
366        got: ConnectionProtocol,
367    },
368    #[error(transparent)]
369    Encoding(#[from] azalea_protocol::write::PacketEncodeError),
370    #[error(transparent)]
371    SendError {
372        #[from]
373        #[backtrace]
374        source: mpsc::error::SendError<Box<[u8]>>,
375    },
376}