1use std::{fmt::Debug, io::Cursor, mem, sync::Arc};
2
3use azalea_crypto::Aes128CfbEnc;
4use azalea_protocol::{
5 connect::{RawReadConnection, RawWriteConnection},
6 packets::{
7 ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
8 game::ClientboundGamePacket, login::ClientboundLoginPacket,
9 },
10 read::{ReadPacketError, deserialize_packet},
11 write::serialize_packet,
12};
13use bevy_app::prelude::*;
14use bevy_ecs::prelude::*;
15use bevy_tasks::{IoTaskPool, futures_lite::future};
16use thiserror::Error;
17use tokio::{
18 io::AsyncWriteExt,
19 net::tcp::OwnedWriteHalf,
20 sync::mpsc::{self},
21};
22use tracing::{debug, error, info, trace};
23
24use super::packet::{
25 config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
26};
27use crate::packet::{config, game, login};
28
29pub struct ConnectionPlugin;
30impl Plugin for ConnectionPlugin {
31 fn build(&self, app: &mut App) {
32 app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
33 }
34}
35
36pub fn read_packets(ecs: &mut World) {
37 let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
39 let mut conn_query = ecs.query::<&mut RawConnection>();
40
41 let mut entities_handling_packets = Vec::new();
42 let mut entities_with_injected_packets = Vec::new();
43 for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
44 if !raw_conn.injected_clientbound_packets.is_empty() {
45 entities_with_injected_packets.push((
46 entity,
47 mem::take(&mut raw_conn.injected_clientbound_packets),
48 ));
49 }
50
51 if raw_conn.network.is_none() {
52 continue;
54 }
55
56 entities_handling_packets.push(entity);
57 }
58
59 let mut queued_packet_events = QueuedPacketEvents::default();
60
61 for (entity, raw_packets) in entities_with_injected_packets {
64 for raw_packet in raw_packets {
65 let conn = conn_query.get(ecs, entity).unwrap();
66 let state = conn.state;
67
68 trace!("Received injected packet with bytes: {raw_packet:?}");
69 if let Err(e) =
70 handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
71 {
72 error!("Error reading injected packet: {e}");
73 }
74 }
75 }
76
77 for entity in entities_handling_packets {
78 loop {
79 let mut conn = conn_query.get_mut(ecs, entity).unwrap();
80 let net_conn = conn.net_conn().unwrap();
81 let read_res = net_conn.reader.try_read();
82 let state = conn.state;
83 match read_res {
84 Ok(Some(raw_packet)) => {
85 let raw_packet = Arc::<[u8]>::from(raw_packet);
86 if let Err(e) = handle_raw_packet(
87 ecs,
88 &raw_packet,
89 entity,
90 state,
91 &mut queued_packet_events,
92 ) {
93 error!("Error reading packet: {e}");
94 }
95 }
96 Ok(None) => {
97 break;
99 }
100 Err(err) => {
101 log_for_error(&err);
102
103 if matches!(
104 &*err,
105 ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
106 ) {
107 info!("Server closed connection");
108 conn.network = None;
110 conn.is_alive = false;
112 }
113
114 break;
115 }
116 }
117 }
118 }
119
120 queued_packet_events.send_events(ecs);
121}
122
123fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
124 for mut conn in conn_query.iter_mut() {
125 if let Some(net_conn) = &mut conn.network {
126 if net_conn.poll_writer().is_some() {
130 conn.network = None;
132 conn.is_alive = false;
133 }
134 }
135 }
136}
137
138#[derive(Default)]
139pub struct QueuedPacketEvents {
140 login: Vec<ReceiveLoginPacketEvent>,
141 config: Vec<ReceiveConfigPacketEvent>,
142 game: Vec<ReceiveGamePacketEvent>,
143}
144impl QueuedPacketEvents {
145 fn send_events(&mut self, ecs: &mut World) {
146 ecs.send_event_batch(self.login.drain(..));
147 ecs.send_event_batch(self.config.drain(..));
148 ecs.send_event_batch(self.game.drain(..));
149 }
150}
151
152fn log_for_error(error: &ReadPacketError) {
153 if !matches!(*error, ReadPacketError::ConnectionClosed) {
154 error!("Error reading packet from Client: {error:?}");
155 }
156}
157
158#[derive(Component)]
160pub struct RawConnection {
161 network: Option<NetworkConnection>,
174 pub state: ConnectionProtocol,
175 is_alive: bool,
176
177 pub injected_clientbound_packets: Vec<Box<[u8]>>,
182}
183impl RawConnection {
184 pub fn new(
185 reader: RawReadConnection,
186 writer: RawWriteConnection,
187 state: ConnectionProtocol,
188 ) -> Self {
189 let task_pool = IoTaskPool::get();
190
191 let (network_packet_writer_tx, network_packet_writer_rx) =
192 mpsc::unbounded_channel::<Box<[u8]>>();
193
194 let writer_task =
195 task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
196
197 let mut conn = Self::new_networkless(state);
198 conn.network = Some(NetworkConnection {
199 reader,
200 enc_cipher: writer.enc_cipher,
201 network_packet_writer_tx,
202 writer_task,
203 });
204
205 conn
206 }
207
208 pub fn new_networkless(state: ConnectionProtocol) -> Self {
209 Self {
210 network: None,
211 state,
212 is_alive: true,
213 injected_clientbound_packets: Vec::new(),
214 }
215 }
216
217 pub fn is_alive(&self) -> bool {
218 self.is_alive
219 }
220
221 pub fn write<P: ProtocolPacket + Debug>(
230 &mut self,
231 packet: impl Packet<P>,
232 ) -> Result<(), WritePacketError> {
233 if let Some(network) = &mut self.network {
234 network.write(packet)?;
235 } else {
236 debug!(
237 "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
238 );
239 }
240 Ok(())
241 }
242
243 pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
244 self.network.as_mut()
245 }
246}
247
248pub fn handle_raw_packet(
249 ecs: &mut World,
250 raw_packet: &[u8],
251 entity: Entity,
252 state: ConnectionProtocol,
253 queued_packet_events: &mut QueuedPacketEvents,
254) -> Result<(), Box<ReadPacketError>> {
255 let stream = &mut Cursor::new(raw_packet);
256 match state {
257 ConnectionProtocol::Handshake => {
258 unreachable!()
259 }
260 ConnectionProtocol::Game => {
261 let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
262 trace!("Packet: {packet:?}");
263 game::process_packet(ecs, entity, packet.as_ref());
264 queued_packet_events
265 .game
266 .push(ReceiveGamePacketEvent { entity, packet });
267 }
268 ConnectionProtocol::Status => {
269 unreachable!()
270 }
271 ConnectionProtocol::Login => {
272 let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
273 trace!("Packet: {packet:?}");
274 login::process_packet(ecs, entity, &packet);
275 queued_packet_events
276 .login
277 .push(ReceiveLoginPacketEvent { entity, packet });
278 }
279 ConnectionProtocol::Configuration => {
280 let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
281 trace!("Packet: {packet:?}");
282 config::process_packet(ecs, entity, &packet);
283 queued_packet_events
284 .config
285 .push(ReceiveConfigPacketEvent { entity, packet });
286 }
287 };
288
289 Ok(())
290}
291
292pub struct NetworkConnection {
293 reader: RawReadConnection,
294 pub enc_cipher: Option<Aes128CfbEnc>,
296
297 pub writer_task: bevy_tasks::Task<()>,
298 network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
302}
303impl NetworkConnection {
304 pub fn write<P: ProtocolPacket + Debug>(
305 &mut self,
306 packet: impl Packet<P>,
307 ) -> Result<(), WritePacketError> {
308 let packet = packet.into_variant();
309 let raw_packet = serialize_packet(&packet)?;
310 self.write_raw(&raw_packet)?;
311
312 Ok(())
313 }
314
315 pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
316 let network_packet = azalea_protocol::write::encode_to_network_packet(
317 raw_packet,
318 self.reader.compression_threshold,
319 &mut self.enc_cipher,
320 );
321 self.network_packet_writer_tx
322 .send(network_packet.into_boxed_slice())?;
323 Ok(())
324 }
325
326 pub fn poll_writer(&mut self) -> Option<()> {
329 let poll_once_res = future::poll_once(&mut self.writer_task);
330 future::block_on(poll_once_res)
331 }
332
333 pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
334 trace!("Set compression threshold to {threshold:?}");
335 self.reader.compression_threshold = threshold;
336 }
337 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
340 trace!("Enabled protocol encryption");
341 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
342 self.reader.dec_cipher = Some(dec_cipher);
343 self.enc_cipher = Some(enc_cipher);
344 }
345}
346
347async fn write_task(
348 mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
349 mut write_half: OwnedWriteHalf,
350) {
351 while let Some(network_packet) = network_packet_writer_rx.recv().await {
352 if let Err(e) = write_half.write_all(&network_packet).await {
353 debug!("Error writing packet to server: {e}");
354 break;
355 };
356 }
357
358 trace!("write task is done");
359}
360
361#[derive(Error, Debug)]
362pub enum WritePacketError {
363 #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
364 WrongState {
365 expected: ConnectionProtocol,
366 got: ConnectionProtocol,
367 },
368 #[error(transparent)]
369 Encoding(#[from] azalea_protocol::write::PacketEncodeError),
370 #[error(transparent)]
371 SendError {
372 #[from]
373 #[backtrace]
374 source: mpsc::error::SendError<Box<[u8]>>,
375 },
376}