azalea_client/plugins/
chunks.rs

1//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used
2//! for making the server spread out how often it sends us chunk packets
3//! depending on our receiving speed.
4
5use std::{
6    io::Cursor,
7    time::{Duration, Instant},
8};
9
10use azalea_core::position::ChunkPos;
11use azalea_protocol::packets::game::{
12    c_level_chunk_with_light::ClientboundLevelChunkWithLight,
13    s_chunk_batch_received::ServerboundChunkBatchReceived,
14};
15use bevy_app::{App, Plugin, Update};
16use bevy_ecs::prelude::*;
17use tracing::{error, trace};
18
19use super::packet::game::handle_outgoing_packets;
20use crate::{
21    InstanceHolder, interact::handle_block_interact_event, inventory::InventorySet,
22    packet::game::SendPacketEvent, respawn::perform_respawn,
23};
24
25pub struct ChunksPlugin;
26impl Plugin for ChunksPlugin {
27    fn build(&self, app: &mut App) {
28        app.add_systems(
29            Update,
30            (
31                handle_chunk_batch_start_event,
32                handle_receive_chunk_events,
33                handle_chunk_batch_finished_event,
34            )
35                .chain()
36                .before(handle_outgoing_packets)
37                .before(InventorySet)
38                .before(handle_block_interact_event)
39                .before(perform_respawn),
40        )
41        .add_event::<ReceiveChunkEvent>()
42        .add_event::<ChunkBatchStartEvent>()
43        .add_event::<ChunkBatchFinishedEvent>();
44    }
45}
46
47#[derive(Event)]
48pub struct ReceiveChunkEvent {
49    pub entity: Entity,
50    pub packet: ClientboundLevelChunkWithLight,
51}
52
53#[derive(Component, Clone, Debug)]
54pub struct ChunkBatchInfo {
55    pub start_time: Instant,
56    pub aggregated_duration_per_chunk: Duration,
57    pub old_samples_weight: u32,
58}
59
60#[derive(Event)]
61pub struct ChunkBatchStartEvent {
62    pub entity: Entity,
63}
64#[derive(Event)]
65pub struct ChunkBatchFinishedEvent {
66    pub entity: Entity,
67    pub batch_size: u32,
68}
69
70pub fn handle_receive_chunk_events(
71    mut events: EventReader<ReceiveChunkEvent>,
72    mut query: Query<&mut InstanceHolder>,
73) {
74    for event in events.read() {
75        let pos = ChunkPos::new(event.packet.x, event.packet.z);
76
77        let local_player = query.get_mut(event.entity).unwrap();
78
79        let mut instance = local_player.instance.write();
80        let mut partial_instance = local_player.partial_instance.write();
81
82        // OPTIMIZATION: if we already know about the chunk from the shared world (and
83        // not ourselves), then we don't need to parse it again. This is only used when
84        // we have a shared world, since we check that the chunk isn't currently owned
85        // by this client.
86        let shared_chunk = instance.chunks.get(&pos);
87        let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
88
89        if !this_client_has_chunk {
90            if let Some(shared_chunk) = shared_chunk {
91                trace!("Skipping parsing chunk {pos:?} because we already know about it");
92                partial_instance
93                    .chunks
94                    .limited_set(&pos, Some(shared_chunk));
95                continue;
96            }
97        }
98
99        let heightmaps = &event.packet.chunk_data.heightmaps;
100
101        if let Err(e) = partial_instance.chunks.replace_with_packet_data(
102            &pos,
103            &mut Cursor::new(&event.packet.chunk_data.data),
104            heightmaps,
105            &mut instance.chunks,
106        ) {
107            error!(
108                "Couldn't set chunk data: {e}. World height: {}",
109                instance.chunks.height
110            );
111        }
112    }
113}
114
115impl ChunkBatchInfo {
116    pub fn batch_finished(&mut self, batch_size: u32) {
117        if batch_size == 0 {
118            return;
119        }
120        let batch_duration = self.start_time.elapsed();
121        let duration_per_chunk = batch_duration / batch_size;
122        let clamped_duration = Duration::clamp(
123            duration_per_chunk,
124            self.aggregated_duration_per_chunk / 3,
125            self.aggregated_duration_per_chunk * 3,
126        );
127        self.aggregated_duration_per_chunk =
128            ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
129                / (self.old_samples_weight + 1);
130        self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
131    }
132
133    pub fn desired_chunks_per_tick(&self) -> f32 {
134        (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
135    }
136}
137
138pub fn handle_chunk_batch_start_event(
139    mut query: Query<&mut ChunkBatchInfo>,
140    mut events: EventReader<ChunkBatchStartEvent>,
141) {
142    for event in events.read() {
143        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
144            chunk_batch_info.start_time = Instant::now();
145        }
146    }
147}
148
149pub fn handle_chunk_batch_finished_event(
150    mut query: Query<&mut ChunkBatchInfo>,
151    mut events: EventReader<ChunkBatchFinishedEvent>,
152    mut commands: Commands,
153) {
154    for event in events.read() {
155        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
156            chunk_batch_info.batch_finished(event.batch_size);
157            let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
158            commands.trigger(SendPacketEvent::new(
159                event.entity,
160                ServerboundChunkBatchReceived {
161                    desired_chunks_per_tick,
162                },
163            ));
164        }
165    }
166}
167
168#[derive(Clone, Debug)]
169pub struct ChunkReceiveSpeedAccumulator {
170    batch_sizes: Vec<u32>,
171    /// as milliseconds
172    batch_durations: Vec<u32>,
173    index: usize,
174    filled_size: usize,
175}
176impl ChunkReceiveSpeedAccumulator {
177    pub fn new(capacity: usize) -> Self {
178        Self {
179            batch_sizes: vec![0; capacity],
180            batch_durations: vec![0; capacity],
181            index: 0,
182            filled_size: 0,
183        }
184    }
185
186    pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
187        self.batch_sizes[self.index] = batch_size;
188        self.batch_durations[self.index] =
189            f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
190        self.index = (self.index + 1) % self.batch_sizes.len();
191        if self.filled_size < self.batch_sizes.len() {
192            self.filled_size += 1;
193        }
194    }
195
196    pub fn get_millis_per_chunk(&self) -> f64 {
197        let mut total_batch_size = 0;
198        let mut total_batch_duration = 0;
199        for i in 0..self.filled_size {
200            total_batch_size += self.batch_sizes[i];
201            total_batch_duration += self.batch_durations[i];
202        }
203        if total_batch_size == 0 {
204            return 0.;
205        }
206        total_batch_duration as f64 / total_batch_size as f64
207    }
208}
209
210impl Default for ChunkBatchInfo {
211    fn default() -> Self {
212        Self {
213            start_time: Instant::now(),
214            aggregated_duration_per_chunk: Duration::from_millis(2),
215            old_samples_weight: 1,
216        }
217    }
218}