1use std::{fmt::Debug, io::Cursor, mem, sync::Arc};
2
3use azalea_crypto::Aes128CfbEnc;
4use azalea_protocol::{
5 connect::{RawReadConnection, RawWriteConnection},
6 packets::{
7 ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
8 game::ClientboundGamePacket, login::ClientboundLoginPacket,
9 },
10 read::{ReadPacketError, deserialize_packet},
11 write::serialize_packet,
12};
13use bevy_app::prelude::*;
14use bevy_ecs::prelude::*;
15use bevy_tasks::{IoTaskPool, futures_lite::future};
16use thiserror::Error;
17use tokio::{
18 io::AsyncWriteExt,
19 net::tcp::OwnedWriteHalf,
20 sync::mpsc::{self},
21};
22use tracing::{debug, error, info, trace};
23
24use super::packet::{
25 config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
26};
27use crate::packet::{config, game, login};
28
29pub struct ConnectionPlugin;
30impl Plugin for ConnectionPlugin {
31 fn build(&self, app: &mut App) {
32 app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
33 }
34}
35
36pub fn read_packets(ecs: &mut World) {
37 let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
38 let mut conn_query = ecs.query::<&mut RawConnection>();
39
40 let mut entities_handling_packets = Vec::new();
41 let mut entities_with_injected_packets = Vec::new();
42 for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
43 if !raw_conn.injected_clientbound_packets.is_empty() {
44 entities_with_injected_packets.push((
45 entity,
46 mem::take(&mut raw_conn.injected_clientbound_packets),
47 ));
48 }
49
50 if raw_conn.network.is_none() {
51 continue;
53 }
54
55 entities_handling_packets.push(entity);
56 }
57
58 let mut queued_packet_events = QueuedPacketEvents::default();
59
60 for (entity, raw_packets) in entities_with_injected_packets {
63 for raw_packet in raw_packets {
64 let conn = conn_query.get(ecs, entity).unwrap();
65 let state = conn.state;
66
67 trace!("Received injected packet with bytes: {raw_packet:?}");
68 if let Err(e) =
69 handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
70 {
71 error!("Error reading injected packet: {e}");
72 }
73 }
74 }
75
76 for entity in entities_handling_packets {
77 loop {
78 let mut conn = conn_query.get_mut(ecs, entity).unwrap();
79 let net_conn = conn.net_conn().unwrap();
80 let read_res = net_conn.reader.try_read();
81 let state = conn.state;
82 match read_res {
83 Ok(Some(raw_packet)) => {
84 let raw_packet = Arc::<[u8]>::from(raw_packet);
85 if let Err(e) = handle_raw_packet(
86 ecs,
87 &raw_packet,
88 entity,
89 state,
90 &mut queued_packet_events,
91 ) {
92 error!("Error reading packet: {e}");
93 }
94 }
95 Ok(None) => {
96 break;
98 }
99 Err(err) => {
100 log_for_error(&err);
101
102 if matches!(
103 &*err,
104 ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
105 ) {
106 info!("Server closed connection");
107 conn.network = None;
109 conn.is_alive = false;
111 }
112
113 break;
114 }
115 }
116 }
117 }
118
119 queued_packet_events.send_events(ecs);
120}
121
122fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
123 for mut conn in conn_query.iter_mut() {
124 if let Some(net_conn) = &mut conn.network {
125 if net_conn.poll_writer().is_some() {
129 conn.network = None;
131 conn.is_alive = false;
132 }
133 }
134 }
135}
136
137#[derive(Default)]
138pub struct QueuedPacketEvents {
139 login: Vec<ReceiveLoginPacketEvent>,
140 config: Vec<ReceiveConfigPacketEvent>,
141 game: Vec<ReceiveGamePacketEvent>,
142}
143impl QueuedPacketEvents {
144 fn send_events(&mut self, ecs: &mut World) {
145 ecs.send_event_batch(self.login.drain(..));
146 ecs.send_event_batch(self.config.drain(..));
147 ecs.send_event_batch(self.game.drain(..));
148 }
149}
150
151fn log_for_error(error: &ReadPacketError) {
152 if !matches!(*error, ReadPacketError::ConnectionClosed) {
153 error!("Error reading packet from Client: {error:?}");
154 }
155}
156
157#[derive(Component)]
159pub struct RawConnection {
160 network: Option<NetworkConnection>,
173 pub state: ConnectionProtocol,
174 is_alive: bool,
175
176 pub injected_clientbound_packets: Vec<Box<[u8]>>,
181}
182impl RawConnection {
183 pub fn new(
184 reader: RawReadConnection,
185 writer: RawWriteConnection,
186 state: ConnectionProtocol,
187 ) -> Self {
188 let task_pool = IoTaskPool::get();
189
190 let (network_packet_writer_tx, network_packet_writer_rx) =
191 mpsc::unbounded_channel::<Box<[u8]>>();
192
193 let writer_task =
194 task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
195
196 let mut conn = Self::new_networkless(state);
197 conn.network = Some(NetworkConnection {
198 reader,
199 enc_cipher: writer.enc_cipher,
200 network_packet_writer_tx,
201 writer_task,
202 });
203
204 conn
205 }
206
207 pub fn new_networkless(state: ConnectionProtocol) -> Self {
208 Self {
209 network: None,
210 state,
211 is_alive: true,
212 injected_clientbound_packets: Vec::new(),
213 }
214 }
215
216 pub fn is_alive(&self) -> bool {
217 self.is_alive
218 }
219
220 pub fn write<P: ProtocolPacket + Debug>(
229 &mut self,
230 packet: impl Packet<P>,
231 ) -> Result<(), WritePacketError> {
232 if let Some(network) = &mut self.network {
233 network.write(packet)?;
234 } else {
235 debug!(
236 "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
237 );
238 }
239 Ok(())
240 }
241
242 pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
243 self.network.as_mut()
244 }
245}
246
247pub fn handle_raw_packet(
248 ecs: &mut World,
249 raw_packet: &[u8],
250 entity: Entity,
251 state: ConnectionProtocol,
252 queued_packet_events: &mut QueuedPacketEvents,
253) -> Result<(), Box<ReadPacketError>> {
254 let stream = &mut Cursor::new(raw_packet);
255 match state {
256 ConnectionProtocol::Handshake => {
257 unreachable!()
258 }
259 ConnectionProtocol::Game => {
260 let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
261 trace!("Packet: {packet:?}");
262 game::process_packet(ecs, entity, packet.as_ref());
263 queued_packet_events
264 .game
265 .push(ReceiveGamePacketEvent { entity, packet });
266 }
267 ConnectionProtocol::Status => {
268 unreachable!()
269 }
270 ConnectionProtocol::Login => {
271 let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
272 trace!("Packet: {packet:?}");
273 login::process_packet(ecs, entity, &packet);
274 queued_packet_events
275 .login
276 .push(ReceiveLoginPacketEvent { entity, packet });
277 }
278 ConnectionProtocol::Configuration => {
279 let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
280 trace!("Packet: {packet:?}");
281 config::process_packet(ecs, entity, &packet);
282 queued_packet_events
283 .config
284 .push(ReceiveConfigPacketEvent { entity, packet });
285 }
286 };
287
288 Ok(())
289}
290
291pub struct NetworkConnection {
292 reader: RawReadConnection,
293 pub enc_cipher: Option<Aes128CfbEnc>,
295
296 pub writer_task: bevy_tasks::Task<()>,
297 network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
301}
302impl NetworkConnection {
303 pub fn write<P: ProtocolPacket + Debug>(
304 &mut self,
305 packet: impl Packet<P>,
306 ) -> Result<(), WritePacketError> {
307 let packet = packet.into_variant();
308 let raw_packet = serialize_packet(&packet)?;
309 self.write_raw(&raw_packet)?;
310
311 Ok(())
312 }
313
314 pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
315 let network_packet = azalea_protocol::write::encode_to_network_packet(
316 raw_packet,
317 self.reader.compression_threshold,
318 &mut self.enc_cipher,
319 );
320 self.network_packet_writer_tx
321 .send(network_packet.into_boxed_slice())?;
322 Ok(())
323 }
324
325 pub fn poll_writer(&mut self) -> Option<()> {
328 let poll_once_res = future::poll_once(&mut self.writer_task);
329 future::block_on(poll_once_res)
330 }
331
332 pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
333 trace!("Set compression threshold to {threshold:?}");
334 self.reader.compression_threshold = threshold;
335 }
336 pub fn set_encryption_key(&mut self, key: [u8; 16]) {
339 trace!("Enabled protocol encryption");
340 let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
341 self.reader.dec_cipher = Some(dec_cipher);
342 self.enc_cipher = Some(enc_cipher);
343 }
344}
345
346async fn write_task(
347 mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
348 mut write_half: OwnedWriteHalf,
349) {
350 while let Some(network_packet) = network_packet_writer_rx.recv().await {
351 if let Err(e) = write_half.write_all(&network_packet).await {
352 debug!("Error writing packet to server: {e}");
353 break;
354 };
355 }
356
357 trace!("write task is done");
358}
359
360#[derive(Error, Debug)]
361pub enum WritePacketError {
362 #[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
363 WrongState {
364 expected: ConnectionProtocol,
365 got: ConnectionProtocol,
366 },
367 #[error(transparent)]
368 Encoding(#[from] azalea_protocol::write::PacketEncodeError),
369 #[error(transparent)]
370 SendError {
371 #[from]
372 #[backtrace]
373 source: mpsc::error::SendError<Box<[u8]>>,
374 },
375}