azalea_client/plugins/
chunks.rs

1//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used
2//! for making the server spread out how often it sends us chunk packets
3//! depending on our receiving speed.
4
5use std::{
6    io::Cursor,
7    time::{Duration, Instant},
8};
9
10use azalea_core::position::ChunkPos;
11use azalea_protocol::packets::game::{
12    c_level_chunk_with_light::ClientboundLevelChunkWithLight,
13    s_chunk_batch_received::ServerboundChunkBatchReceived,
14};
15use bevy_app::{App, Plugin, Update};
16use bevy_ecs::prelude::*;
17use tracing::{error, trace};
18
19use crate::{
20    InstanceHolder, interact::handle_block_interact_event, inventory::InventorySet,
21    packet::game::SendPacketEvent, respawn::perform_respawn,
22};
23
24pub struct ChunksPlugin;
25impl Plugin for ChunksPlugin {
26    fn build(&self, app: &mut App) {
27        app.add_systems(
28            Update,
29            (
30                handle_chunk_batch_start_event,
31                handle_receive_chunk_events,
32                handle_chunk_batch_finished_event,
33            )
34                .chain()
35                .before(InventorySet)
36                .before(handle_block_interact_event)
37                .before(perform_respawn),
38        )
39        .add_event::<ReceiveChunkEvent>()
40        .add_event::<ChunkBatchStartEvent>()
41        .add_event::<ChunkBatchFinishedEvent>();
42    }
43}
44
45#[derive(Event)]
46pub struct ReceiveChunkEvent {
47    pub entity: Entity,
48    pub packet: ClientboundLevelChunkWithLight,
49}
50
51#[derive(Component, Clone, Debug)]
52pub struct ChunkBatchInfo {
53    pub start_time: Instant,
54    pub aggregated_duration_per_chunk: Duration,
55    pub old_samples_weight: u32,
56}
57
58#[derive(Event)]
59pub struct ChunkBatchStartEvent {
60    pub entity: Entity,
61}
62#[derive(Event)]
63pub struct ChunkBatchFinishedEvent {
64    pub entity: Entity,
65    pub batch_size: u32,
66}
67
68pub fn handle_receive_chunk_events(
69    mut events: EventReader<ReceiveChunkEvent>,
70    mut query: Query<&mut InstanceHolder>,
71) {
72    for event in events.read() {
73        let pos = ChunkPos::new(event.packet.x, event.packet.z);
74
75        let local_player = query.get_mut(event.entity).unwrap();
76
77        let mut instance = local_player.instance.write();
78        let mut partial_instance = local_player.partial_instance.write();
79
80        // OPTIMIZATION: if we already know about the chunk from the shared world (and
81        // not ourselves), then we don't need to parse it again. This is only used when
82        // we have a shared world, since we check that the chunk isn't currently owned
83        // by this client.
84        let shared_chunk = instance.chunks.get(&pos);
85        let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
86
87        if !this_client_has_chunk {
88            if let Some(shared_chunk) = shared_chunk {
89                trace!("Skipping parsing chunk {pos:?} because we already know about it");
90                partial_instance
91                    .chunks
92                    .limited_set(&pos, Some(shared_chunk));
93                continue;
94            }
95        }
96
97        let heightmaps = &event.packet.chunk_data.heightmaps;
98
99        if let Err(e) = partial_instance.chunks.replace_with_packet_data(
100            &pos,
101            &mut Cursor::new(&event.packet.chunk_data.data),
102            heightmaps,
103            &mut instance.chunks,
104        ) {
105            error!(
106                "Couldn't set chunk data: {e}. World height: {}",
107                instance.chunks.height
108            );
109        }
110    }
111}
112
113impl ChunkBatchInfo {
114    pub fn batch_finished(&mut self, batch_size: u32) {
115        if batch_size == 0 {
116            return;
117        }
118        let batch_duration = self.start_time.elapsed();
119        let duration_per_chunk = batch_duration / batch_size;
120        let clamped_duration = Duration::clamp(
121            duration_per_chunk,
122            self.aggregated_duration_per_chunk / 3,
123            self.aggregated_duration_per_chunk * 3,
124        );
125        self.aggregated_duration_per_chunk =
126            ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
127                / (self.old_samples_weight + 1);
128        self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
129    }
130
131    pub fn desired_chunks_per_tick(&self) -> f32 {
132        (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
133    }
134}
135
136pub fn handle_chunk_batch_start_event(
137    mut query: Query<&mut ChunkBatchInfo>,
138    mut events: EventReader<ChunkBatchStartEvent>,
139) {
140    for event in events.read() {
141        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
142            chunk_batch_info.start_time = Instant::now();
143        }
144    }
145}
146
147pub fn handle_chunk_batch_finished_event(
148    mut query: Query<&mut ChunkBatchInfo>,
149    mut events: EventReader<ChunkBatchFinishedEvent>,
150    mut commands: Commands,
151) {
152    for event in events.read() {
153        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
154            chunk_batch_info.batch_finished(event.batch_size);
155            let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
156            commands.trigger(SendPacketEvent::new(
157                event.entity,
158                ServerboundChunkBatchReceived {
159                    desired_chunks_per_tick,
160                },
161            ));
162        }
163    }
164}
165
166#[derive(Clone, Debug)]
167pub struct ChunkReceiveSpeedAccumulator {
168    batch_sizes: Vec<u32>,
169    /// as milliseconds
170    batch_durations: Vec<u32>,
171    index: usize,
172    filled_size: usize,
173}
174impl ChunkReceiveSpeedAccumulator {
175    pub fn new(capacity: usize) -> Self {
176        Self {
177            batch_sizes: vec![0; capacity],
178            batch_durations: vec![0; capacity],
179            index: 0,
180            filled_size: 0,
181        }
182    }
183
184    pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
185        self.batch_sizes[self.index] = batch_size;
186        self.batch_durations[self.index] =
187            f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
188        self.index = (self.index + 1) % self.batch_sizes.len();
189        if self.filled_size < self.batch_sizes.len() {
190            self.filled_size += 1;
191        }
192    }
193
194    pub fn get_millis_per_chunk(&self) -> f64 {
195        let mut total_batch_size = 0;
196        let mut total_batch_duration = 0;
197        for i in 0..self.filled_size {
198            total_batch_size += self.batch_sizes[i];
199            total_batch_duration += self.batch_durations[i];
200        }
201        if total_batch_size == 0 {
202            return 0.;
203        }
204        total_batch_duration as f64 / total_batch_size as f64
205    }
206}
207
208impl Default for ChunkBatchInfo {
209    fn default() -> Self {
210        Self {
211            start_time: Instant::now(),
212            aggregated_duration_per_chunk: Duration::from_millis(2),
213            old_samples_weight: 1,
214        }
215    }
216}