1use std::{
2 fmt::Debug,
3 io::Cursor,
4 mem,
5 sync::{
6 Arc,
7 atomic::{self, AtomicBool},
8 },
9};
10
11use azalea_crypto::Aes128CfbEnc;
12use azalea_protocol::{
13 connect::{RawReadConnection, RawWriteConnection},
14 packets::{
15 ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
16 game::ClientboundGamePacket, login::ClientboundLoginPacket,
17 },
18 read::{ReadPacketError, deserialize_packet},
19 write::serialize_packet,
20};
21use bevy_app::prelude::*;
22use bevy_ecs::prelude::*;
23use bevy_tasks::{IoTaskPool, futures_lite::future};
24use thiserror::Error;
25use tokio::{
26 io::AsyncWriteExt,
27 net::tcp::OwnedWriteHalf,
28 sync::mpsc::{self},
29};
30use tracing::{debug, error, info, trace};
31
32use super::packet::{
33 config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
34};
35use crate::packet::{config, game, login};
36
37pub struct ConnectionPlugin;
38impl Plugin for ConnectionPlugin {
39 fn build(&self, app: &mut App) {
40 app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
41 }
42}
43
44pub fn read_packets(ecs: &mut World) {
45 let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
46 let mut conn_query = ecs.query::<&mut RawConnection>();
47
48 let mut entities_handling_packets = Vec::new();
49 let mut entities_with_injected_packets = Vec::new();
50 for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
51 if !raw_conn.injected_clientbound_packets.is_empty() {
52 entities_with_injected_packets.push((
53 entity,
54 mem::take(&mut raw_conn.injected_clientbound_packets),
55 ));
56 }
57
58 if raw_conn.network.is_none() {
59 continue;
61 }
62
63 entities_handling_packets.push(entity);
64 }
65
66 let mut queued_packet_events = QueuedPacketEvents::default();
67
68 for (entity, raw_packets) in entities_with_injected_packets {
71 for raw_packet in raw_packets {
72 let conn = conn_query.get(ecs, entity).unwrap();
73 let state = conn.state;
74
75 trace!("Received injected packet with bytes: {raw_packet:?}");
76 if let Err(e) =
77 handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
78 {
79 error!("Error reading injected packet: {e}");
80 }
81 }
82 }
83
84 for entity in entities_handling_packets {
85 loop {
86 let mut conn = conn_query.get_mut(ecs, entity).unwrap();
87 let net_conn = conn.net_conn().unwrap();
88 let read_res = net_conn.reader.try_read();
89 let state = conn.state;
90 match read_res {
91 Ok(Some(raw_packet)) => {
92 let raw_packet = Arc::<[u8]>::from(raw_packet);
93 if let Err(e) = handle_raw_packet(
94 ecs,
95 &raw_packet,
96 entity,
97 state,
98 &mut queued_packet_events,
99 ) {
100 error!("Error reading packet: {e}");
101 }
102 }
103 Ok(None) => {
104 break;
106 }
107 Err(err) => {
108 log_for_error(&err);
109
110 if matches!(
111 &*err,
112 ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
113 ) {
114 info!("Server closed connection");
115 conn.network = None;
117 conn.is_alive = false;
119 }
120
121 break;
122 }
123 }
124 }
125 }
126
127 queued_packet_events.write_messages(ecs);
128}
129
130fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
131 for mut conn in conn_query.iter_mut() {
132 if let Some(net_conn) = &mut conn.network {
133 if net_conn.poll_writer().is_some() {
137 conn.network = None;
139 conn.is_alive = false;
140 }
141 }
142 }
143}
144
145#[derive(Default)]
146pub struct QueuedPacketEvents {
147 login: Vec<ReceiveLoginPacketEvent>,
148 config: Vec<ReceiveConfigPacketEvent>,
149 game: Vec<ReceiveGamePacketEvent>,
150}
151impl QueuedPacketEvents {
152 fn write_messages(&mut self, ecs: &mut World) {
153 ecs.write_message_batch(self.login.drain(..));
154 ecs.write_message_batch(self.config.drain(..));
155 ecs.write_message_batch(self.game.drain(..));
156 }
157}
158
159fn log_for_error(error: &ReadPacketError) {
160 if !matches!(*error, ReadPacketError::ConnectionClosed) {
161 error!("Error reading packet from Client: {error:?}");
162 }
163}
164
165#[derive(Component)]
167pub struct RawConnection {
168 pub(crate) network: Option<NetworkConnection>,
181 pub state: ConnectionProtocol,
182 pub(crate) is_alive: bool,
183
184 pub injected_clientbound_packets: Vec<Box<[u8]>>,
190}
191impl RawConnection {
192 pub fn new(
193 reader: RawReadConnection,
194 writer: RawWriteConnection,
195 state: ConnectionProtocol,
196 ) -> Self {
197 let task_pool = IoTaskPool::get();
198
199 let (network_packet_writer_tx, network_packet_writer_rx) =
200 mpsc::unbounded_channel::<Box<[u8]>>();
201
202 let writer_task =
203 task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
204
205 let mut conn = Self::new_networkless(state);
206 conn.network = Some(NetworkConnection {
207 reader,
208 enc_cipher: writer.enc_cipher,
209 network_packet_writer_tx,
210 writer_task,
211 });
212
213 conn
214 }
215
216 pub fn new_networkless(state: ConnectionProtocol) -> Self {
217 Self {
218 network: None,
219 state,
220 is_alive: true,
221 injected_clientbound_packets: Vec::new(),
222 }
223 }
224
225 pub fn is_alive(&self) -> bool {
226 self.is_alive
227 }
228
229 pub fn write<P: ProtocolPacket + Debug>(
238 &mut self,
239 packet: impl Packet<P>,
240 ) -> Result<(), WritePacketError> {
241 if let Some(network) = &mut self.network {
242 network.write(packet)?;
243 } else {
244 static WARNED: AtomicBool = AtomicBool::new(false);
245 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
246 debug!(
247 "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
248 );
249 }
250 }
251 Ok(())
252 }
253
254 pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
255 self.network.as_mut()
256 }
257}
258
259pub fn handle_raw_packet(
260 ecs: &mut World,
261 raw_packet: &[u8],
262 entity: Entity,
263 state: ConnectionProtocol,
264 queued_packet_events: &mut QueuedPacketEvents,
265) -> Result<(), Box<ReadPacketError>> {
266 let stream = &mut Cursor::new(raw_packet);
267 match state {
268 ConnectionProtocol::Handshake => {
269 unreachable!()
270 }
271 ConnectionProtocol::Game => {
272 let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
273 trace!("Packet: {packet:?}");
274 game::process_packet(ecs, entity, packet.as_ref());
275 queued_packet_events
276 .game
277 .push(ReceiveGamePacketEvent { entity, packet });
278 }
279 ConnectionProtocol::Status => {
280 unreachable!()
281 }
282 ConnectionProtocol::Login => {
283 let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
284 trace!("Packet: {packet:?}");
285 login::process_packet(ecs, entity, &packet);
286 queued_packet_events
287 .login
288 .push(ReceiveLoginPacketEvent { entity, packet });
289 }
290 ConnectionProtocol::Configuration => {
291 let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
292 trace!("Packet: {packet:?}");
293 config::process_packet(ecs, entity, &packet);
294 queued_packet_events
295 .config
296 .push(ReceiveConfigPacketEvent { entity, packet });
297 }
298 };
299
300 Ok(())
301}
302
303pub struct NetworkConnection {
304 reader: RawReadConnection,
305 pub enc_cipher: Option<Aes128CfbEnc>,
307
308 pub writer_task: bevy_tasks::Task<()>,
309 network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
314}
315impl NetworkConnection {
316 pub fn write<P: ProtocolPacket + Debug>(
317 &mut self,
318 packet: impl Packet<P>,
319 ) -> Result<(), WritePacketError> {
320 let packet = packet.into_variant();
321 let raw_packet = serialize_packet(&packet)?;
322 self.write_raw(&raw_packet)?;
323
324 Ok(())
325 }
326
327 pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
328 let network_packet = azalea_protocol::write::encode_to_network_packet(
329 raw_packet,
330 self.reader.compression_threshold,
331 &mut self.enc_cipher,
332 );
333 self.network_packet_writer_tx
334 .send(network_packet.into_boxed_slice())?;
335 Ok(())
336 }
337
338 pub fn poll_writer(&mut self) -> Option<()> {
341 let poll_once_res = future::poll_once(&mut self.writer_task);
342 future::block_on(poll_once_res)
343 }
344
345 pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
346 trace!("Set compression threshold to {threshold:?}");
347 self.reader.compression_threshold = threshold;
348 }
349 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
353 trace!("Enabled protocol encryption");
354 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
355 self.reader.dec_cipher = Some(dec_cipher);
356 self.enc_cipher = Some(enc_cipher);
357 }
358}
359
360async fn write_task(
361 mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
362 mut write_half: OwnedWriteHalf,
363) {
364 while let Some(network_packet) = network_packet_writer_rx.recv().await {
365 if let Err(e) = write_half.write_all(&network_packet).await {
366 debug!("Error writing packet to server: {e}");
367 break;
368 };
369 }
370
371 trace!("write task is done");
372}
373
374#[derive(Error, Debug)]
375pub enum WritePacketError {
376 #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
377 WrongState {
378 expected: ConnectionProtocol,
379 got: ConnectionProtocol,
380 },
381 #[error(transparent)]
382 Encoding(#[from] azalea_protocol::write::PacketEncodeError),
383 #[error(transparent)]
384 SendError {
385 #[from]
386 #[backtrace]
387 source: mpsc::error::SendError<Box<[u8]>>,
388 },
389}