azalea_client/
chunks.rs

1//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used
2//! for making the server spread out how often it sends us chunk packets
3//! depending on our receiving speed.
4
5use std::{
6    io::Cursor,
7    ops::Deref,
8    time::{Duration, Instant},
9};
10
11use azalea_core::position::ChunkPos;
12use azalea_protocol::packets::game::{
13    c_level_chunk_with_light::ClientboundLevelChunkWithLight,
14    s_chunk_batch_received::ServerboundChunkBatchReceived,
15};
16use bevy_app::{App, Plugin, Update};
17use bevy_ecs::prelude::*;
18use simdnbt::owned::BaseNbt;
19use tracing::{error, trace};
20
21use crate::{
22    interact::handle_block_interact_event,
23    inventory::InventorySet,
24    packet_handling::game::{handle_send_packet_event, SendPacketEvent},
25    respawn::perform_respawn,
26    InstanceHolder,
27};
28
29pub struct ChunkPlugin;
30impl Plugin for ChunkPlugin {
31    fn build(&self, app: &mut App) {
32        app.add_systems(
33            Update,
34            (
35                handle_chunk_batch_start_event,
36                handle_receive_chunk_events,
37                handle_chunk_batch_finished_event,
38            )
39                .chain()
40                .before(handle_send_packet_event)
41                .before(InventorySet)
42                .before(handle_block_interact_event)
43                .before(perform_respawn),
44        )
45        .add_event::<ReceiveChunkEvent>()
46        .add_event::<ChunkBatchStartEvent>()
47        .add_event::<ChunkBatchFinishedEvent>();
48    }
49}
50
51#[derive(Event)]
52pub struct ReceiveChunkEvent {
53    pub entity: Entity,
54    pub packet: ClientboundLevelChunkWithLight,
55}
56
57#[derive(Component, Clone, Debug)]
58pub struct ChunkBatchInfo {
59    pub start_time: Instant,
60    pub aggregated_duration_per_chunk: Duration,
61    pub old_samples_weight: u32,
62}
63
64#[derive(Event)]
65pub struct ChunkBatchStartEvent {
66    pub entity: Entity,
67}
68#[derive(Event)]
69pub struct ChunkBatchFinishedEvent {
70    pub entity: Entity,
71    pub batch_size: u32,
72}
73
74pub fn handle_receive_chunk_events(
75    mut events: EventReader<ReceiveChunkEvent>,
76    mut query: Query<&mut InstanceHolder>,
77) {
78    for event in events.read() {
79        let pos = ChunkPos::new(event.packet.x, event.packet.z);
80
81        let local_player = query.get_mut(event.entity).unwrap();
82
83        let mut instance = local_player.instance.write();
84        let mut partial_instance = local_player.partial_instance.write();
85
86        // OPTIMIZATION: if we already know about the chunk from the shared world (and
87        // not ourselves), then we don't need to parse it again. This is only used when
88        // we have a shared world, since we check that the chunk isn't currently owned
89        // by this client.
90        let shared_chunk = instance.chunks.get(&pos);
91        let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
92
93        if !this_client_has_chunk {
94            if let Some(shared_chunk) = shared_chunk {
95                trace!("Skipping parsing chunk {pos:?} because we already know about it");
96                partial_instance
97                    .chunks
98                    .limited_set(&pos, Some(shared_chunk));
99                continue;
100            }
101        }
102
103        let heightmaps_nbt = &event.packet.chunk_data.heightmaps;
104        // necessary to make the unwrap_or work
105        let empty_nbt = BaseNbt::default();
106        let heightmaps = heightmaps_nbt.unwrap_or(&empty_nbt).deref();
107
108        if let Err(e) = partial_instance.chunks.replace_with_packet_data(
109            &pos,
110            &mut Cursor::new(&event.packet.chunk_data.data),
111            heightmaps,
112            &mut instance.chunks,
113        ) {
114            error!(
115                "Couldn't set chunk data: {e}. World height: {}",
116                instance.chunks.height
117            );
118        }
119    }
120}
121
122impl ChunkBatchInfo {
123    pub fn batch_finished(&mut self, batch_size: u32) {
124        if batch_size == 0 {
125            return;
126        }
127        let batch_duration = self.start_time.elapsed();
128        let duration_per_chunk = batch_duration / batch_size;
129        let clamped_duration = Duration::clamp(
130            duration_per_chunk,
131            self.aggregated_duration_per_chunk / 3,
132            self.aggregated_duration_per_chunk * 3,
133        );
134        self.aggregated_duration_per_chunk =
135            ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
136                / (self.old_samples_weight + 1);
137        self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
138    }
139
140    pub fn desired_chunks_per_tick(&self) -> f32 {
141        (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
142    }
143}
144
145pub fn handle_chunk_batch_start_event(
146    mut query: Query<&mut ChunkBatchInfo>,
147    mut events: EventReader<ChunkBatchStartEvent>,
148) {
149    for event in events.read() {
150        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
151            chunk_batch_info.start_time = Instant::now();
152        }
153    }
154}
155
156pub fn handle_chunk_batch_finished_event(
157    mut query: Query<&mut ChunkBatchInfo>,
158    mut events: EventReader<ChunkBatchFinishedEvent>,
159    mut send_packets: EventWriter<SendPacketEvent>,
160) {
161    for event in events.read() {
162        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
163            chunk_batch_info.batch_finished(event.batch_size);
164            let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
165            send_packets.send(SendPacketEvent::new(
166                event.entity,
167                ServerboundChunkBatchReceived {
168                    desired_chunks_per_tick,
169                },
170            ));
171        }
172    }
173}
174
175#[derive(Clone, Debug)]
176pub struct ChunkReceiveSpeedAccumulator {
177    batch_sizes: Vec<u32>,
178    /// as milliseconds
179    batch_durations: Vec<u32>,
180    index: usize,
181    filled_size: usize,
182}
183impl ChunkReceiveSpeedAccumulator {
184    pub fn new(capacity: usize) -> Self {
185        Self {
186            batch_sizes: vec![0; capacity],
187            batch_durations: vec![0; capacity],
188            index: 0,
189            filled_size: 0,
190        }
191    }
192
193    pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
194        self.batch_sizes[self.index] = batch_size;
195        self.batch_durations[self.index] =
196            f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
197        self.index = (self.index + 1) % self.batch_sizes.len();
198        if self.filled_size < self.batch_sizes.len() {
199            self.filled_size += 1;
200        }
201    }
202
203    pub fn get_millis_per_chunk(&self) -> f64 {
204        let mut total_batch_size = 0;
205        let mut total_batch_duration = 0;
206        for i in 0..self.filled_size {
207            total_batch_size += self.batch_sizes[i];
208            total_batch_duration += self.batch_durations[i];
209        }
210        if total_batch_size == 0 {
211            return 0.;
212        }
213        total_batch_duration as f64 / total_batch_size as f64
214    }
215}
216
217impl Default for ChunkBatchInfo {
218    fn default() -> Self {
219        Self {
220            start_time: Instant::now(),
221            aggregated_duration_per_chunk: Duration::from_millis(2),
222            old_samples_weight: 1,
223        }
224    }
225}