azalea_client/plugins/
chunks.rs1use std::{
6 io::Cursor,
7 time::{Duration, Instant},
8};
9
10use azalea_core::position::ChunkPos;
11use azalea_protocol::packets::game::{
12 c_level_chunk_with_light::ClientboundLevelChunkWithLight,
13 s_chunk_batch_received::ServerboundChunkBatchReceived,
14};
15use bevy_app::{App, Plugin, Update};
16use bevy_ecs::prelude::*;
17use tracing::{error, trace};
18
19use super::packet::game::handle_outgoing_packets;
20use crate::{
21 InstanceHolder, interact::handle_block_interact_event, inventory::InventorySet,
22 packet::game::SendPacketEvent, respawn::perform_respawn,
23};
24
25pub struct ChunksPlugin;
26impl Plugin for ChunksPlugin {
27 fn build(&self, app: &mut App) {
28 app.add_systems(
29 Update,
30 (
31 handle_chunk_batch_start_event,
32 handle_receive_chunk_events,
33 handle_chunk_batch_finished_event,
34 )
35 .chain()
36 .before(handle_outgoing_packets)
37 .before(InventorySet)
38 .before(handle_block_interact_event)
39 .before(perform_respawn),
40 )
41 .add_event::<ReceiveChunkEvent>()
42 .add_event::<ChunkBatchStartEvent>()
43 .add_event::<ChunkBatchFinishedEvent>();
44 }
45}
46
47#[derive(Event)]
48pub struct ReceiveChunkEvent {
49 pub entity: Entity,
50 pub packet: ClientboundLevelChunkWithLight,
51}
52
53#[derive(Component, Clone, Debug)]
54pub struct ChunkBatchInfo {
55 pub start_time: Instant,
56 pub aggregated_duration_per_chunk: Duration,
57 pub old_samples_weight: u32,
58}
59
60#[derive(Event)]
61pub struct ChunkBatchStartEvent {
62 pub entity: Entity,
63}
64#[derive(Event)]
65pub struct ChunkBatchFinishedEvent {
66 pub entity: Entity,
67 pub batch_size: u32,
68}
69
70pub fn handle_receive_chunk_events(
71 mut events: EventReader<ReceiveChunkEvent>,
72 mut query: Query<&mut InstanceHolder>,
73) {
74 for event in events.read() {
75 let pos = ChunkPos::new(event.packet.x, event.packet.z);
76
77 let local_player = query.get_mut(event.entity).unwrap();
78
79 let mut instance = local_player.instance.write();
80 let mut partial_instance = local_player.partial_instance.write();
81
82 let shared_chunk = instance.chunks.get(&pos);
87 let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
88
89 if !this_client_has_chunk {
90 if let Some(shared_chunk) = shared_chunk {
91 trace!("Skipping parsing chunk {pos:?} because we already know about it");
92 partial_instance
93 .chunks
94 .limited_set(&pos, Some(shared_chunk));
95 continue;
96 }
97 }
98
99 let heightmaps = &event.packet.chunk_data.heightmaps;
100
101 if let Err(e) = partial_instance.chunks.replace_with_packet_data(
102 &pos,
103 &mut Cursor::new(&event.packet.chunk_data.data),
104 heightmaps,
105 &mut instance.chunks,
106 ) {
107 error!(
108 "Couldn't set chunk data: {e}. World height: {}",
109 instance.chunks.height
110 );
111 }
112 }
113}
114
115impl ChunkBatchInfo {
116 pub fn batch_finished(&mut self, batch_size: u32) {
117 if batch_size == 0 {
118 return;
119 }
120 let batch_duration = self.start_time.elapsed();
121 let duration_per_chunk = batch_duration / batch_size;
122 let clamped_duration = Duration::clamp(
123 duration_per_chunk,
124 self.aggregated_duration_per_chunk / 3,
125 self.aggregated_duration_per_chunk * 3,
126 );
127 self.aggregated_duration_per_chunk =
128 ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
129 / (self.old_samples_weight + 1);
130 self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
131 }
132
133 pub fn desired_chunks_per_tick(&self) -> f32 {
134 (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
135 }
136}
137
138pub fn handle_chunk_batch_start_event(
139 mut query: Query<&mut ChunkBatchInfo>,
140 mut events: EventReader<ChunkBatchStartEvent>,
141) {
142 for event in events.read() {
143 if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
144 chunk_batch_info.start_time = Instant::now();
145 }
146 }
147}
148
149pub fn handle_chunk_batch_finished_event(
150 mut query: Query<&mut ChunkBatchInfo>,
151 mut events: EventReader<ChunkBatchFinishedEvent>,
152 mut commands: Commands,
153) {
154 for event in events.read() {
155 if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
156 chunk_batch_info.batch_finished(event.batch_size);
157 let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
158 commands.trigger(SendPacketEvent::new(
159 event.entity,
160 ServerboundChunkBatchReceived {
161 desired_chunks_per_tick,
162 },
163 ));
164 }
165 }
166}
167
168#[derive(Clone, Debug)]
169pub struct ChunkReceiveSpeedAccumulator {
170 batch_sizes: Vec<u32>,
171 batch_durations: Vec<u32>,
173 index: usize,
174 filled_size: usize,
175}
176impl ChunkReceiveSpeedAccumulator {
177 pub fn new(capacity: usize) -> Self {
178 Self {
179 batch_sizes: vec![0; capacity],
180 batch_durations: vec![0; capacity],
181 index: 0,
182 filled_size: 0,
183 }
184 }
185
186 pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
187 self.batch_sizes[self.index] = batch_size;
188 self.batch_durations[self.index] =
189 f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
190 self.index = (self.index + 1) % self.batch_sizes.len();
191 if self.filled_size < self.batch_sizes.len() {
192 self.filled_size += 1;
193 }
194 }
195
196 pub fn get_millis_per_chunk(&self) -> f64 {
197 let mut total_batch_size = 0;
198 let mut total_batch_duration = 0;
199 for i in 0..self.filled_size {
200 total_batch_size += self.batch_sizes[i];
201 total_batch_duration += self.batch_durations[i];
202 }
203 if total_batch_size == 0 {
204 return 0.;
205 }
206 total_batch_duration as f64 / total_batch_size as f64
207 }
208}
209
210impl Default for ChunkBatchInfo {
211 fn default() -> Self {
212 Self {
213 start_time: Instant::now(),
214 aggregated_duration_per_chunk: Duration::from_millis(2),
215 old_samples_weight: 1,
216 }
217 }
218}