azalea_client/plugins/
chunks.rs1use std::{
6 io::Cursor,
7 time::{Duration, Instant},
8};
9
10use azalea_core::position::ChunkPos;
11use azalea_protocol::packets::game::{
12 c_level_chunk_with_light::ClientboundLevelChunkWithLight,
13 s_chunk_batch_received::ServerboundChunkBatchReceived,
14};
15use bevy_app::{App, Plugin, Update};
16use bevy_ecs::prelude::*;
17use tracing::{error, trace};
18
19use crate::{
20 interact::handle_start_use_item_queued, inventory::InventorySet, local_player::InstanceHolder,
21 packet::game::SendPacketEvent, respawn::perform_respawn,
22};
23
24pub struct ChunksPlugin;
25impl Plugin for ChunksPlugin {
26 fn build(&self, app: &mut App) {
27 app.add_systems(
28 Update,
29 (
30 handle_chunk_batch_start_event,
31 handle_receive_chunk_events,
32 handle_chunk_batch_finished_event,
33 )
34 .chain()
35 .before(InventorySet)
36 .before(handle_start_use_item_queued)
37 .before(perform_respawn),
38 )
39 .add_event::<ReceiveChunkEvent>()
40 .add_event::<ChunkBatchStartEvent>()
41 .add_event::<ChunkBatchFinishedEvent>();
42 }
43}
44
45#[derive(Event)]
46pub struct ReceiveChunkEvent {
47 pub entity: Entity,
48 pub packet: ClientboundLevelChunkWithLight,
49}
50
51#[derive(Component, Clone, Debug)]
52pub struct ChunkBatchInfo {
53 pub start_time: Instant,
54 pub aggregated_duration_per_chunk: Duration,
55 pub old_samples_weight: u32,
56}
57
58#[derive(Event)]
59pub struct ChunkBatchStartEvent {
60 pub entity: Entity,
61}
62#[derive(Event)]
63pub struct ChunkBatchFinishedEvent {
64 pub entity: Entity,
65 pub batch_size: u32,
66}
67
68pub fn handle_receive_chunk_events(
69 mut events: EventReader<ReceiveChunkEvent>,
70 mut query: Query<&InstanceHolder>,
71) {
72 for event in events.read() {
73 let pos = ChunkPos::new(event.packet.x, event.packet.z);
74
75 let local_player = query.get_mut(event.entity).unwrap();
76
77 let mut instance = local_player.instance.write();
78 let mut partial_instance = local_player.partial_instance.write();
79
80 let shared_chunk = instance.chunks.get(&pos);
85 let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
86
87 if !this_client_has_chunk && let Some(shared_chunk) = shared_chunk {
88 trace!("Skipping parsing chunk {pos:?} because we already know about it");
89 partial_instance
90 .chunks
91 .limited_set(&pos, Some(shared_chunk));
92 continue;
93 }
94
95 let heightmaps = &event.packet.chunk_data.heightmaps;
96
97 if let Err(e) = partial_instance.chunks.replace_with_packet_data(
98 &pos,
99 &mut Cursor::new(&event.packet.chunk_data.data),
100 heightmaps,
101 &mut instance.chunks,
102 ) {
103 error!(
104 "Couldn't set chunk data: {e}. World height: {}",
105 instance.chunks.height
106 );
107 }
108 }
109}
110
111impl ChunkBatchInfo {
112 pub fn batch_finished(&mut self, batch_size: u32) {
113 if batch_size == 0 {
114 return;
115 }
116 let batch_duration = self.start_time.elapsed();
117 let duration_per_chunk = batch_duration / batch_size;
118 let clamped_duration = Duration::clamp(
119 duration_per_chunk,
120 self.aggregated_duration_per_chunk / 3,
121 self.aggregated_duration_per_chunk * 3,
122 );
123 self.aggregated_duration_per_chunk =
124 ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
125 / (self.old_samples_weight + 1);
126 self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
127 }
128
129 pub fn desired_chunks_per_tick(&self) -> f32 {
130 (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
131 }
132}
133
134pub fn handle_chunk_batch_start_event(
135 mut query: Query<&mut ChunkBatchInfo>,
136 mut events: EventReader<ChunkBatchStartEvent>,
137) {
138 for event in events.read() {
139 if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
140 chunk_batch_info.start_time = Instant::now();
141 }
142 }
143}
144
145pub fn handle_chunk_batch_finished_event(
146 mut query: Query<&mut ChunkBatchInfo>,
147 mut events: EventReader<ChunkBatchFinishedEvent>,
148 mut commands: Commands,
149) {
150 for event in events.read() {
151 if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
152 chunk_batch_info.batch_finished(event.batch_size);
153 let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
154 commands.trigger(SendPacketEvent::new(
155 event.entity,
156 ServerboundChunkBatchReceived {
157 desired_chunks_per_tick,
158 },
159 ));
160 }
161 }
162}
163
164#[derive(Clone, Debug)]
165pub struct ChunkReceiveSpeedAccumulator {
166 batch_sizes: Vec<u32>,
167 batch_durations: Vec<u32>,
169 index: usize,
170 filled_size: usize,
171}
172impl ChunkReceiveSpeedAccumulator {
173 pub fn new(capacity: usize) -> Self {
174 Self {
175 batch_sizes: vec![0; capacity],
176 batch_durations: vec![0; capacity],
177 index: 0,
178 filled_size: 0,
179 }
180 }
181
182 pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
183 self.batch_sizes[self.index] = batch_size;
184 self.batch_durations[self.index] =
185 f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
186 self.index = (self.index + 1) % self.batch_sizes.len();
187 if self.filled_size < self.batch_sizes.len() {
188 self.filled_size += 1;
189 }
190 }
191
192 pub fn get_millis_per_chunk(&self) -> f64 {
193 let mut total_batch_size = 0;
194 let mut total_batch_duration = 0;
195 for i in 0..self.filled_size {
196 total_batch_size += self.batch_sizes[i];
197 total_batch_duration += self.batch_durations[i];
198 }
199 if total_batch_size == 0 {
200 return 0.;
201 }
202 total_batch_duration as f64 / total_batch_size as f64
203 }
204}
205
206impl Default for ChunkBatchInfo {
207 fn default() -> Self {
208 Self {
209 start_time: Instant::now(),
210 aggregated_duration_per_chunk: Duration::from_millis(2),
211 old_samples_weight: 1,
212 }
213 }
214}