azalea_client/
raw_connection.rs1use std::fmt::Debug;
2use std::sync::Arc;
3
4use azalea_protocol::{
5 connect::{RawReadConnection, RawWriteConnection},
6 packets::{ConnectionProtocol, Packet, ProtocolPacket},
7 read::ReadPacketError,
8 write::serialize_packet,
9};
10use bevy_ecs::prelude::*;
11use parking_lot::Mutex;
12use thiserror::Error;
13use tokio::sync::mpsc::{
14 self,
15 error::{SendError, TrySendError},
16};
17use tracing::error;
18
19#[derive(Component)]
23pub struct RawConnection {
24 pub reader: RawConnectionReader,
25 pub writer: RawConnectionWriter,
26
27 pub read_packets_task: tokio::task::JoinHandle<()>,
31 pub write_packets_task: tokio::task::JoinHandle<()>,
33
34 pub connection_protocol: ConnectionProtocol,
35}
36
37#[derive(Clone)]
38pub struct RawConnectionReader {
39 pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
40 pub run_schedule_sender: mpsc::Sender<()>,
41}
42#[derive(Clone)]
43pub struct RawConnectionWriter {
44 pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
45}
46
47#[derive(Error, Debug)]
48pub enum WritePacketError {
49 #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
50 WrongState {
51 expected: ConnectionProtocol,
52 got: ConnectionProtocol,
53 },
54 #[error(transparent)]
55 Encoding(#[from] azalea_protocol::write::PacketEncodeError),
56 #[error(transparent)]
57 SendError {
58 #[from]
59 #[backtrace]
60 source: SendError<Box<[u8]>>,
61 },
62}
63
64impl RawConnection {
65 pub fn new(
66 run_schedule_sender: mpsc::Sender<()>,
67 connection_protocol: ConnectionProtocol,
68 raw_read_connection: RawReadConnection,
69 raw_write_connection: RawWriteConnection,
70 ) -> Self {
71 let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
72
73 let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
74
75 let reader = RawConnectionReader {
76 incoming_packet_queue: incoming_packet_queue.clone(),
77 run_schedule_sender,
78 };
79 let writer = RawConnectionWriter {
80 outgoing_packets_sender,
81 };
82
83 let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
84 let write_packets_task = tokio::spawn(
85 writer
86 .clone()
87 .write_task(raw_write_connection, outgoing_packets_receiver),
88 );
89
90 Self {
91 reader,
92 writer,
93 read_packets_task,
94 write_packets_task,
95 connection_protocol,
96 }
97 }
98
99 pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
100 self.writer.outgoing_packets_sender.send(raw_packet)?;
101 Ok(())
102 }
103
104 pub fn write_packet<P: ProtocolPacket + Debug>(
111 &self,
112 packet: impl Packet<P>,
113 ) -> Result<(), WritePacketError> {
114 let packet = packet.into_variant();
115 let raw_packet = serialize_packet(&packet)?;
116 self.write_raw_packet(raw_packet)?;
117
118 Ok(())
119 }
120
121 pub fn is_alive(&self) -> bool {
123 !self.read_packets_task.is_finished()
124 }
125
126 pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
127 self.reader.incoming_packet_queue.clone()
128 }
129
130 pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
131 self.connection_protocol = connection_protocol;
132 }
133}
134
135impl RawConnectionReader {
136 pub async fn read_task(self, mut read_conn: RawReadConnection) {
139 fn log_for_error(error: &ReadPacketError) {
140 if !matches!(*error, ReadPacketError::ConnectionClosed) {
141 error!("Error reading packet from Client: {error:?}");
142 }
143 }
144
145 loop {
146 match read_conn.read().await {
147 Ok(raw_packet) => {
148 let mut incoming_packet_queue = self.incoming_packet_queue.lock();
149
150 incoming_packet_queue.push(raw_packet);
151 loop {
155 let raw_packet = match read_conn.try_read() {
156 Ok(p) => p,
157 Err(err) => {
158 log_for_error(&err);
159 return;
160 }
161 };
162 let Some(raw_packet) = raw_packet else { break };
163 incoming_packet_queue.push(raw_packet);
164 }
165
166 if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) {
168 break;
170 }
171 }
172 Err(err) => {
173 log_for_error(&err);
174 return;
175 }
176 }
177 }
178 }
179}
180
181impl RawConnectionWriter {
182 pub async fn write_task(
188 self,
189 mut write_conn: RawWriteConnection,
190 mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
191 ) {
192 while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
193 if let Err(err) = write_conn.write(&raw_packet).await {
194 error!("Disconnecting because we couldn't write a packet: {err}.");
195 break;
196 };
197 }
198 }
200}
201
202impl Drop for RawConnection {
203 fn drop(&mut self) {
205 self.read_packets_task.abort();
206 self.write_packets_task.abort();
207 }
208}