azalea_client/
raw_connection.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use azalea_protocol::{
5    connect::{RawReadConnection, RawWriteConnection},
6    packets::{ConnectionProtocol, Packet, ProtocolPacket},
7    read::ReadPacketError,
8    write::serialize_packet,
9};
10use bevy_ecs::prelude::*;
11use parking_lot::Mutex;
12use thiserror::Error;
13use tokio::sync::mpsc::{self, error::SendError};
14use tracing::error;
15
16/// A component for clients that can read and write packets to the server. This
17/// works with raw bytes, so you'll have to serialize/deserialize packets
18/// yourself. It will do the compression and encryption for you though.
19#[derive(Component)]
20pub struct RawConnection {
21    pub reader: RawConnectionReader,
22    pub writer: RawConnectionWriter,
23
24    /// Packets sent to this will be sent to the server.
25    /// A task that reads packets from the server. The client is disconnected
26    /// when this task ends.
27    pub read_packets_task: tokio::task::JoinHandle<()>,
28    /// A task that writes packets from the server.
29    pub write_packets_task: tokio::task::JoinHandle<()>,
30
31    pub connection_protocol: ConnectionProtocol,
32}
33
34#[derive(Clone)]
35pub struct RawConnectionReader {
36    pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
37    pub run_schedule_sender: mpsc::UnboundedSender<()>,
38}
39#[derive(Clone)]
40pub struct RawConnectionWriter {
41    pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
42}
43
44#[derive(Error, Debug)]
45pub enum WritePacketError {
46    #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
47    WrongState {
48        expected: ConnectionProtocol,
49        got: ConnectionProtocol,
50    },
51    #[error(transparent)]
52    Encoding(#[from] azalea_protocol::write::PacketEncodeError),
53    #[error(transparent)]
54    SendError {
55        #[from]
56        #[backtrace]
57        source: SendError<Box<[u8]>>,
58    },
59}
60
61impl RawConnection {
62    pub fn new(
63        run_schedule_sender: mpsc::UnboundedSender<()>,
64        connection_protocol: ConnectionProtocol,
65        raw_read_connection: RawReadConnection,
66        raw_write_connection: RawWriteConnection,
67    ) -> Self {
68        let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
69
70        let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
71
72        let reader = RawConnectionReader {
73            incoming_packet_queue: incoming_packet_queue.clone(),
74            run_schedule_sender,
75        };
76        let writer = RawConnectionWriter {
77            outgoing_packets_sender,
78        };
79
80        let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
81        let write_packets_task = tokio::spawn(
82            writer
83                .clone()
84                .write_task(raw_write_connection, outgoing_packets_receiver),
85        );
86
87        Self {
88            reader,
89            writer,
90            read_packets_task,
91            write_packets_task,
92            connection_protocol,
93        }
94    }
95
96    pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
97        self.writer.outgoing_packets_sender.send(raw_packet)?;
98        Ok(())
99    }
100
101    /// Write the packet with the given state to the server.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the packet is not valid for the current state, or if
106    /// encoding it failed somehow (like it's too big or something).
107    pub fn write_packet<P: ProtocolPacket + Debug>(
108        &self,
109        packet: impl Packet<P>,
110    ) -> Result<(), WritePacketError> {
111        let packet = packet.into_variant();
112        let raw_packet = serialize_packet(&packet)?;
113        self.write_raw_packet(raw_packet)?;
114
115        Ok(())
116    }
117
118    /// Returns whether the connection is still alive.
119    pub fn is_alive(&self) -> bool {
120        !self.read_packets_task.is_finished()
121    }
122
123    pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
124        self.reader.incoming_packet_queue.clone()
125    }
126
127    pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
128        self.connection_protocol = connection_protocol;
129    }
130}
131
132impl RawConnectionReader {
133    /// Loop that reads from the connection and adds the packets to the queue +
134    /// runs the schedule.
135    pub async fn read_task(self, mut read_conn: RawReadConnection) {
136        loop {
137            match read_conn.read().await {
138                Ok(raw_packet) => {
139                    self.incoming_packet_queue.lock().push(raw_packet);
140                    // tell the client to run all the systems
141                    if self.run_schedule_sender.send(()).is_err() {
142                        // the client was dropped
143                        break;
144                    }
145                }
146                Err(error) => {
147                    if !matches!(*error, ReadPacketError::ConnectionClosed) {
148                        error!("Error reading packet from Client: {error:?}");
149                    }
150                    break;
151                }
152            }
153        }
154    }
155}
156
157impl RawConnectionWriter {
158    /// Consume the [`ServerboundGamePacket`] queue and actually write the
159    /// packets to the server. It's like this so writing packets doesn't need to
160    /// be awaited.
161    ///
162    /// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket
163    pub async fn write_task(
164        self,
165        mut write_conn: RawWriteConnection,
166        mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
167    ) {
168        while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
169            if let Err(err) = write_conn.write(&raw_packet).await {
170                error!("Disconnecting because we couldn't write a packet: {err}.");
171                break;
172            };
173        }
174        // receiver is automatically closed when it's dropped
175    }
176}
177
178impl Drop for RawConnection {
179    /// Stop every active task when this `RawConnection` is dropped.
180    fn drop(&mut self) {
181        self.read_packets_task.abort();
182        self.write_packets_task.abort();
183    }
184}