1use std::{
6 io::Cursor,
7 ops::Deref,
8 time::{Duration, Instant},
9};
10
11use azalea_core::position::ChunkPos;
12use azalea_protocol::packets::game::{
13 c_level_chunk_with_light::ClientboundLevelChunkWithLight,
14 s_chunk_batch_received::ServerboundChunkBatchReceived,
15};
16use bevy_app::{App, Plugin, Update};
17use bevy_ecs::prelude::*;
18use simdnbt::owned::BaseNbt;
19use tracing::{error, trace};
20
21use crate::{
22 interact::handle_block_interact_event,
23 inventory::InventorySet,
24 packet_handling::game::{handle_send_packet_event, SendPacketEvent},
25 respawn::perform_respawn,
26 InstanceHolder,
27};
28
29pub struct ChunkPlugin;
30impl Plugin for ChunkPlugin {
31 fn build(&self, app: &mut App) {
32 app.add_systems(
33 Update,
34 (
35 handle_chunk_batch_start_event,
36 handle_receive_chunk_events,
37 handle_chunk_batch_finished_event,
38 )
39 .chain()
40 .before(handle_send_packet_event)
41 .before(InventorySet)
42 .before(handle_block_interact_event)
43 .before(perform_respawn),
44 )
45 .add_event::<ReceiveChunkEvent>()
46 .add_event::<ChunkBatchStartEvent>()
47 .add_event::<ChunkBatchFinishedEvent>();
48 }
49}
50
51#[derive(Event)]
52pub struct ReceiveChunkEvent {
53 pub entity: Entity,
54 pub packet: ClientboundLevelChunkWithLight,
55}
56
57#[derive(Component, Clone, Debug)]
58pub struct ChunkBatchInfo {
59 pub start_time: Instant,
60 pub aggregated_duration_per_chunk: Duration,
61 pub old_samples_weight: u32,
62}
63
64#[derive(Event)]
65pub struct ChunkBatchStartEvent {
66 pub entity: Entity,
67}
68#[derive(Event)]
69pub struct ChunkBatchFinishedEvent {
70 pub entity: Entity,
71 pub batch_size: u32,
72}
73
74pub fn handle_receive_chunk_events(
75 mut events: EventReader<ReceiveChunkEvent>,
76 mut query: Query<&mut InstanceHolder>,
77) {
78 for event in events.read() {
79 let pos = ChunkPos::new(event.packet.x, event.packet.z);
80
81 let local_player = query.get_mut(event.entity).unwrap();
82
83 let mut instance = local_player.instance.write();
84 let mut partial_instance = local_player.partial_instance.write();
85
86 let shared_chunk = instance.chunks.get(&pos);
91 let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
92
93 if !this_client_has_chunk {
94 if let Some(shared_chunk) = shared_chunk {
95 trace!("Skipping parsing chunk {pos:?} because we already know about it");
96 partial_instance
97 .chunks
98 .limited_set(&pos, Some(shared_chunk));
99 continue;
100 }
101 }
102
103 let heightmaps_nbt = &event.packet.chunk_data.heightmaps;
104 let empty_nbt = BaseNbt::default();
106 let heightmaps = heightmaps_nbt.unwrap_or(&empty_nbt).deref();
107
108 if let Err(e) = partial_instance.chunks.replace_with_packet_data(
109 &pos,
110 &mut Cursor::new(&event.packet.chunk_data.data),
111 heightmaps,
112 &mut instance.chunks,
113 ) {
114 error!(
115 "Couldn't set chunk data: {e}. World height: {}",
116 instance.chunks.height
117 );
118 }
119 }
120}
121
122impl ChunkBatchInfo {
123 pub fn batch_finished(&mut self, batch_size: u32) {
124 if batch_size == 0 {
125 return;
126 }
127 let batch_duration = self.start_time.elapsed();
128 let duration_per_chunk = batch_duration / batch_size;
129 let clamped_duration = Duration::clamp(
130 duration_per_chunk,
131 self.aggregated_duration_per_chunk / 3,
132 self.aggregated_duration_per_chunk * 3,
133 );
134 self.aggregated_duration_per_chunk =
135 ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
136 / (self.old_samples_weight + 1);
137 self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
138 }
139
140 pub fn desired_chunks_per_tick(&self) -> f32 {
141 (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
142 }
143}
144
145pub fn handle_chunk_batch_start_event(
146 mut query: Query<&mut ChunkBatchInfo>,
147 mut events: EventReader<ChunkBatchStartEvent>,
148) {
149 for event in events.read() {
150 if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
151 chunk_batch_info.start_time = Instant::now();
152 }
153 }
154}
155
156pub fn handle_chunk_batch_finished_event(
157 mut query: Query<&mut ChunkBatchInfo>,
158 mut events: EventReader<ChunkBatchFinishedEvent>,
159 mut send_packets: EventWriter<SendPacketEvent>,
160) {
161 for event in events.read() {
162 if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
163 chunk_batch_info.batch_finished(event.batch_size);
164 let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
165 send_packets.send(SendPacketEvent::new(
166 event.entity,
167 ServerboundChunkBatchReceived {
168 desired_chunks_per_tick,
169 },
170 ));
171 }
172 }
173}
174
175#[derive(Clone, Debug)]
176pub struct ChunkReceiveSpeedAccumulator {
177 batch_sizes: Vec<u32>,
178 batch_durations: Vec<u32>,
180 index: usize,
181 filled_size: usize,
182}
183impl ChunkReceiveSpeedAccumulator {
184 pub fn new(capacity: usize) -> Self {
185 Self {
186 batch_sizes: vec![0; capacity],
187 batch_durations: vec![0; capacity],
188 index: 0,
189 filled_size: 0,
190 }
191 }
192
193 pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
194 self.batch_sizes[self.index] = batch_size;
195 self.batch_durations[self.index] =
196 f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
197 self.index = (self.index + 1) % self.batch_sizes.len();
198 if self.filled_size < self.batch_sizes.len() {
199 self.filled_size += 1;
200 }
201 }
202
203 pub fn get_millis_per_chunk(&self) -> f64 {
204 let mut total_batch_size = 0;
205 let mut total_batch_duration = 0;
206 for i in 0..self.filled_size {
207 total_batch_size += self.batch_sizes[i];
208 total_batch_duration += self.batch_durations[i];
209 }
210 if total_batch_size == 0 {
211 return 0.;
212 }
213 total_batch_duration as f64 / total_batch_size as f64
214 }
215}
216
217impl Default for ChunkBatchInfo {
218 fn default() -> Self {
219 Self {
220 start_time: Instant::now(),
221 aggregated_duration_per_chunk: Duration::from_millis(2),
222 old_samples_weight: 1,
223 }
224 }
225}