azalea_client/plugins/
chunks.rs

1//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used
2//! for making the server spread out how often it sends us chunk packets
3//! depending on our receiving speed.
4
5use std::{
6    io::Cursor,
7    time::{Duration, Instant},
8};
9
10use azalea_core::position::ChunkPos;
11use azalea_protocol::packets::game::{
12    c_level_chunk_with_light::ClientboundLevelChunkWithLight,
13    s_chunk_batch_received::ServerboundChunkBatchReceived,
14};
15use bevy_app::{App, Plugin, Update};
16use bevy_ecs::prelude::*;
17use tracing::{error, trace};
18
19use crate::{
20    inventory::InventorySet, local_player::InstanceHolder, packet::game::SendPacketEvent,
21    respawn::perform_respawn,
22};
23
24pub struct ChunksPlugin;
25impl Plugin for ChunksPlugin {
26    fn build(&self, app: &mut App) {
27        app.add_systems(
28            Update,
29            (
30                handle_chunk_batch_start_event,
31                handle_receive_chunk_event,
32                handle_chunk_batch_finished_event,
33            )
34                .chain()
35                .before(InventorySet)
36                .before(perform_respawn),
37        )
38        .add_event::<ReceiveChunkEvent>()
39        .add_event::<ChunkBatchStartEvent>()
40        .add_event::<ChunkBatchFinishedEvent>();
41    }
42}
43
44#[derive(Event)]
45pub struct ReceiveChunkEvent {
46    pub entity: Entity,
47    pub packet: ClientboundLevelChunkWithLight,
48}
49
50#[derive(Component, Clone, Debug)]
51pub struct ChunkBatchInfo {
52    pub start_time: Instant,
53    pub aggregated_duration_per_chunk: Duration,
54    pub old_samples_weight: u32,
55}
56
57#[derive(Event)]
58pub struct ChunkBatchStartEvent {
59    pub entity: Entity,
60}
61#[derive(Event)]
62pub struct ChunkBatchFinishedEvent {
63    pub entity: Entity,
64    pub batch_size: u32,
65}
66
67pub fn handle_receive_chunk_event(
68    mut events: EventReader<ReceiveChunkEvent>,
69    mut query: Query<&InstanceHolder>,
70) {
71    for event in events.read() {
72        let pos = ChunkPos::new(event.packet.x, event.packet.z);
73
74        let local_player = query.get_mut(event.entity).unwrap();
75
76        let mut instance = local_player.instance.write();
77        let mut partial_instance = local_player.partial_instance.write();
78
79        // OPTIMIZATION: if we already know about the chunk from the shared world (and
80        // not ourselves), then we don't need to parse it again. This is only used when
81        // we have a shared world, since we check that the chunk isn't currently owned
82        // by this client.
83        let shared_chunk = instance.chunks.get(&pos);
84        let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
85
86        if !this_client_has_chunk && let Some(shared_chunk) = shared_chunk {
87            trace!("Skipping parsing chunk {pos:?} because we already know about it");
88            partial_instance
89                .chunks
90                .limited_set(&pos, Some(shared_chunk));
91            continue;
92        }
93
94        let heightmaps = &event.packet.chunk_data.heightmaps;
95
96        if let Err(e) = partial_instance.chunks.replace_with_packet_data(
97            &pos,
98            &mut Cursor::new(&event.packet.chunk_data.data),
99            heightmaps,
100            &mut instance.chunks,
101        ) {
102            error!(
103                "Couldn't set chunk data: {e}. World height: {}",
104                instance.chunks.height
105            );
106        }
107    }
108}
109
110impl ChunkBatchInfo {
111    pub fn batch_finished(&mut self, batch_size: u32) {
112        if batch_size == 0 {
113            return;
114        }
115        let batch_duration = self.start_time.elapsed();
116        let duration_per_chunk = batch_duration / batch_size;
117        let clamped_duration = Duration::clamp(
118            duration_per_chunk,
119            self.aggregated_duration_per_chunk / 3,
120            self.aggregated_duration_per_chunk * 3,
121        );
122        self.aggregated_duration_per_chunk =
123            ((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
124                / (self.old_samples_weight + 1);
125        self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
126    }
127
128    pub fn desired_chunks_per_tick(&self) -> f32 {
129        (7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
130    }
131}
132
133pub fn handle_chunk_batch_start_event(
134    mut query: Query<&mut ChunkBatchInfo>,
135    mut events: EventReader<ChunkBatchStartEvent>,
136) {
137    for event in events.read() {
138        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
139            chunk_batch_info.start_time = Instant::now();
140        }
141    }
142}
143
144pub fn handle_chunk_batch_finished_event(
145    mut query: Query<&mut ChunkBatchInfo>,
146    mut events: EventReader<ChunkBatchFinishedEvent>,
147    mut commands: Commands,
148) {
149    for event in events.read() {
150        if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
151            chunk_batch_info.batch_finished(event.batch_size);
152            let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
153            commands.trigger(SendPacketEvent::new(
154                event.entity,
155                ServerboundChunkBatchReceived {
156                    desired_chunks_per_tick,
157                },
158            ));
159        }
160    }
161}
162
163#[derive(Clone, Debug)]
164pub struct ChunkReceiveSpeedAccumulator {
165    batch_sizes: Vec<u32>,
166    /// as milliseconds
167    batch_durations: Vec<u32>,
168    index: usize,
169    filled_size: usize,
170}
171impl ChunkReceiveSpeedAccumulator {
172    pub fn new(capacity: usize) -> Self {
173        Self {
174            batch_sizes: vec![0; capacity],
175            batch_durations: vec![0; capacity],
176            index: 0,
177            filled_size: 0,
178        }
179    }
180
181    pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
182        self.batch_sizes[self.index] = batch_size;
183        self.batch_durations[self.index] =
184            f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
185        self.index = (self.index + 1) % self.batch_sizes.len();
186        if self.filled_size < self.batch_sizes.len() {
187            self.filled_size += 1;
188        }
189    }
190
191    pub fn get_millis_per_chunk(&self) -> f64 {
192        let mut total_batch_size = 0;
193        let mut total_batch_duration = 0;
194        for i in 0..self.filled_size {
195            total_batch_size += self.batch_sizes[i];
196            total_batch_duration += self.batch_durations[i];
197        }
198        if total_batch_size == 0 {
199            return 0.;
200        }
201        total_batch_duration as f64 / total_batch_size as f64
202    }
203}
204
205impl Default for ChunkBatchInfo {
206    fn default() -> Self {
207        Self {
208            start_time: Instant::now(),
209            aggregated_duration_per_chunk: Duration::from_millis(2),
210            old_samples_weight: 1,
211        }
212    }
213}