azalea_client/
raw_connection.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use azalea_protocol::{
5    connect::{RawReadConnection, RawWriteConnection},
6    packets::{ConnectionProtocol, Packet, ProtocolPacket},
7    read::ReadPacketError,
8    write::serialize_packet,
9};
10use bevy_ecs::prelude::*;
11use parking_lot::Mutex;
12use thiserror::Error;
13use tokio::sync::mpsc::{
14    self,
15    error::{SendError, TrySendError},
16};
17use tracing::error;
18
19/// A component for clients that can read and write packets to the server. This
20/// works with raw bytes, so you'll have to serialize/deserialize packets
21/// yourself. It will do the compression and encryption for you though.
22#[derive(Component)]
23pub struct RawConnection {
24    pub reader: RawConnectionReader,
25    pub writer: RawConnectionWriter,
26
27    /// Packets sent to this will be sent to the server.
28    /// A task that reads packets from the server. The client is disconnected
29    /// when this task ends.
30    pub read_packets_task: tokio::task::JoinHandle<()>,
31    /// A task that writes packets from the server.
32    pub write_packets_task: tokio::task::JoinHandle<()>,
33
34    pub connection_protocol: ConnectionProtocol,
35}
36
37#[derive(Clone)]
38pub struct RawConnectionReader {
39    pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
40    pub run_schedule_sender: mpsc::Sender<()>,
41}
42#[derive(Clone)]
43pub struct RawConnectionWriter {
44    pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
45}
46
47#[derive(Error, Debug)]
48pub enum WritePacketError {
49    #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
50    WrongState {
51        expected: ConnectionProtocol,
52        got: ConnectionProtocol,
53    },
54    #[error(transparent)]
55    Encoding(#[from] azalea_protocol::write::PacketEncodeError),
56    #[error(transparent)]
57    SendError {
58        #[from]
59        #[backtrace]
60        source: SendError<Box<[u8]>>,
61    },
62}
63
64impl RawConnection {
65    pub fn new(
66        run_schedule_sender: mpsc::Sender<()>,
67        connection_protocol: ConnectionProtocol,
68        raw_read_connection: RawReadConnection,
69        raw_write_connection: RawWriteConnection,
70    ) -> Self {
71        let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
72
73        let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
74
75        let reader = RawConnectionReader {
76            incoming_packet_queue: incoming_packet_queue.clone(),
77            run_schedule_sender,
78        };
79        let writer = RawConnectionWriter {
80            outgoing_packets_sender,
81        };
82
83        let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
84        let write_packets_task = tokio::spawn(
85            writer
86                .clone()
87                .write_task(raw_write_connection, outgoing_packets_receiver),
88        );
89
90        Self {
91            reader,
92            writer,
93            read_packets_task,
94            write_packets_task,
95            connection_protocol,
96        }
97    }
98
99    pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
100        self.writer.outgoing_packets_sender.send(raw_packet)?;
101        Ok(())
102    }
103
104    /// Write the packet with the given state to the server.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the packet is not valid for the current state, or if
109    /// encoding it failed somehow (like it's too big or something).
110    pub fn write_packet<P: ProtocolPacket + Debug>(
111        &self,
112        packet: impl Packet<P>,
113    ) -> Result<(), WritePacketError> {
114        let packet = packet.into_variant();
115        let raw_packet = serialize_packet(&packet)?;
116        self.write_raw_packet(raw_packet)?;
117
118        Ok(())
119    }
120
121    /// Returns whether the connection is still alive.
122    pub fn is_alive(&self) -> bool {
123        !self.read_packets_task.is_finished()
124    }
125
126    pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
127        self.reader.incoming_packet_queue.clone()
128    }
129
130    pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
131        self.connection_protocol = connection_protocol;
132    }
133}
134
135impl RawConnectionReader {
136    /// Loop that reads from the connection and adds the packets to the queue +
137    /// runs the schedule.
138    pub async fn read_task(self, mut read_conn: RawReadConnection) {
139        fn log_for_error(error: &ReadPacketError) {
140            if !matches!(*error, ReadPacketError::ConnectionClosed) {
141                error!("Error reading packet from Client: {error:?}");
142            }
143        }
144
145        loop {
146            match read_conn.read().await {
147                Ok(raw_packet) => {
148                    let mut incoming_packet_queue = self.incoming_packet_queue.lock();
149
150                    incoming_packet_queue.push(raw_packet);
151                    // this makes it so packets received at the same time are guaranteed to be
152                    // handled in the same tick. this is also an attempt at making it so we can't
153                    // receive any packets in the ticks/updates after being disconnected.
154                    loop {
155                        let raw_packet = match read_conn.try_read() {
156                            Ok(p) => p,
157                            Err(err) => {
158                                log_for_error(&err);
159                                return;
160                            }
161                        };
162                        let Some(raw_packet) = raw_packet else { break };
163                        incoming_packet_queue.push(raw_packet);
164                    }
165
166                    // tell the client to run all the systems
167                    if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) {
168                        // the client was dropped
169                        break;
170                    }
171                }
172                Err(err) => {
173                    log_for_error(&err);
174                    return;
175                }
176            }
177        }
178    }
179}
180
181impl RawConnectionWriter {
182    /// Consume the [`ServerboundGamePacket`] queue and actually write the
183    /// packets to the server. It's like this so writing packets doesn't need to
184    /// be awaited.
185    ///
186    /// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket
187    pub async fn write_task(
188        self,
189        mut write_conn: RawWriteConnection,
190        mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
191    ) {
192        while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
193            if let Err(err) = write_conn.write(&raw_packet).await {
194                error!("Disconnecting because we couldn't write a packet: {err}.");
195                break;
196            };
197        }
198        // receiver is automatically closed when it's dropped
199    }
200}
201
202impl Drop for RawConnection {
203    /// Stop every active task when this `RawConnection` is dropped.
204    fn drop(&mut self) {
205        self.read_packets_task.abort();
206        self.write_packets_task.abort();
207    }
208}