azalea_client/
raw_connection.rs1use std::fmt::Debug;
2use std::sync::Arc;
3
4use azalea_protocol::{
5 connect::{RawReadConnection, RawWriteConnection},
6 packets::{ConnectionProtocol, Packet, ProtocolPacket},
7 read::ReadPacketError,
8 write::serialize_packet,
9};
10use bevy_ecs::prelude::*;
11use parking_lot::Mutex;
12use thiserror::Error;
13use tokio::sync::mpsc::{self, error::SendError};
14use tracing::error;
15
16#[derive(Component)]
20pub struct RawConnection {
21 pub reader: RawConnectionReader,
22 pub writer: RawConnectionWriter,
23
24 pub read_packets_task: tokio::task::JoinHandle<()>,
28 pub write_packets_task: tokio::task::JoinHandle<()>,
30
31 pub connection_protocol: ConnectionProtocol,
32}
33
34#[derive(Clone)]
35pub struct RawConnectionReader {
36 pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
37 pub run_schedule_sender: mpsc::UnboundedSender<()>,
38}
39#[derive(Clone)]
40pub struct RawConnectionWriter {
41 pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
42}
43
44#[derive(Error, Debug)]
45pub enum WritePacketError {
46 #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
47 WrongState {
48 expected: ConnectionProtocol,
49 got: ConnectionProtocol,
50 },
51 #[error(transparent)]
52 Encoding(#[from] azalea_protocol::write::PacketEncodeError),
53 #[error(transparent)]
54 SendError {
55 #[from]
56 #[backtrace]
57 source: SendError<Box<[u8]>>,
58 },
59}
60
61impl RawConnection {
62 pub fn new(
63 run_schedule_sender: mpsc::UnboundedSender<()>,
64 connection_protocol: ConnectionProtocol,
65 raw_read_connection: RawReadConnection,
66 raw_write_connection: RawWriteConnection,
67 ) -> Self {
68 let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
69
70 let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
71
72 let reader = RawConnectionReader {
73 incoming_packet_queue: incoming_packet_queue.clone(),
74 run_schedule_sender,
75 };
76 let writer = RawConnectionWriter {
77 outgoing_packets_sender,
78 };
79
80 let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
81 let write_packets_task = tokio::spawn(
82 writer
83 .clone()
84 .write_task(raw_write_connection, outgoing_packets_receiver),
85 );
86
87 Self {
88 reader,
89 writer,
90 read_packets_task,
91 write_packets_task,
92 connection_protocol,
93 }
94 }
95
96 pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
97 self.writer.outgoing_packets_sender.send(raw_packet)?;
98 Ok(())
99 }
100
101 pub fn write_packet<P: ProtocolPacket + Debug>(
108 &self,
109 packet: impl Packet<P>,
110 ) -> Result<(), WritePacketError> {
111 let packet = packet.into_variant();
112 let raw_packet = serialize_packet(&packet)?;
113 self.write_raw_packet(raw_packet)?;
114
115 Ok(())
116 }
117
118 pub fn is_alive(&self) -> bool {
120 !self.read_packets_task.is_finished()
121 }
122
123 pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
124 self.reader.incoming_packet_queue.clone()
125 }
126
127 pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
128 self.connection_protocol = connection_protocol;
129 }
130}
131
132impl RawConnectionReader {
133 pub async fn read_task(self, mut read_conn: RawReadConnection) {
136 loop {
137 match read_conn.read().await {
138 Ok(raw_packet) => {
139 self.incoming_packet_queue.lock().push(raw_packet);
140 if self.run_schedule_sender.send(()).is_err() {
142 break;
144 }
145 }
146 Err(error) => {
147 if !matches!(*error, ReadPacketError::ConnectionClosed) {
148 error!("Error reading packet from Client: {error:?}");
149 }
150 break;
151 }
152 }
153 }
154 }
155}
156
157impl RawConnectionWriter {
158 pub async fn write_task(
164 self,
165 mut write_conn: RawWriteConnection,
166 mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
167 ) {
168 while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
169 if let Err(err) = write_conn.write(&raw_packet).await {
170 error!("Disconnecting because we couldn't write a packet: {err}.");
171 break;
172 };
173 }
174 }
176}
177
178impl Drop for RawConnection {
179 fn drop(&mut self) {
181 self.read_packets_task.abort();
182 self.write_packets_task.abort();
183 }
184}