azalea/swarm/
chat.rs

1//! Implements `SwarmEvent::Chat`.
2
3// How the chat event works (to avoid firing the event multiple times):
4// ---
5// There's a shared queue of all the chat messages
6// Each bot contains an index of the farthest message we've seen
7// When a bot receives a chat messages, it looks into the queue to find the
8// earliest instance of the message content that's after the bot's chat index.
9// If it finds it, then its personal index is simply updated. Otherwise, fire
10// the event and add to the queue.
11//
12// To make sure the queue doesn't grow too large, we keep a `chat_min_index`
13// in Swarm that's set to the smallest index of all the bots, and we remove all
14// messages from the queue that are before that index.
15
16use std::collections::VecDeque;
17
18use azalea_client::chat::{ChatPacket, ChatReceivedEvent};
19use bevy_app::{App, Plugin, Update};
20use bevy_ecs::prelude::Event;
21
22use super::{Swarm, SwarmEvent};
23use crate::ecs::{
24    component::Component,
25    event::{EventReader, EventWriter},
26    schedule::IntoSystemConfigs,
27    system::{Commands, Query, Res, ResMut, Resource},
28};
29
30#[derive(Clone)]
31pub struct SwarmChatPlugin;
32impl Plugin for SwarmChatPlugin {
33    fn build(&self, app: &mut App) {
34        app.add_event::<NewChatMessageEvent>()
35            .add_systems(
36                Update,
37                (chat_listener, update_min_index_and_shrink_queue).chain(),
38            )
39            .insert_resource(GlobalChatState {
40                chat_queue: VecDeque::new(),
41                chat_min_index: 0,
42            });
43    }
44}
45
46#[derive(Component, Debug)]
47pub struct ClientChatState {
48    pub chat_index: usize,
49}
50
51/// A chat message that no other bots have seen yet was received by a bot.
52#[derive(Event, Debug)]
53pub struct NewChatMessageEvent(ChatPacket);
54
55#[derive(Resource)]
56pub struct GlobalChatState {
57    pub chat_queue: VecDeque<ChatPacket>,
58    pub chat_min_index: usize,
59}
60
61fn chat_listener(
62    mut commands: Commands,
63    mut query: Query<&mut ClientChatState>,
64    mut events: EventReader<ChatReceivedEvent>,
65    mut global_chat_state: ResMut<GlobalChatState>,
66    mut new_chat_messages_events: EventWriter<NewChatMessageEvent>,
67) {
68    for event in events.read() {
69        let mut client_chat_state = query.get_mut(event.entity);
70        let mut client_chat_index = if let Ok(ref client_chat_state) = client_chat_state {
71            client_chat_state.chat_index
72        } else {
73            // if the client has no chat state, we default to this and insert it at the end
74            global_chat_state.chat_min_index
75        };
76
77        // When a bot receives a chat messages, it looks into the queue to find the
78        // earliest instance of the message content that's after the bot's chat index.
79        // If it finds it, then its personal index is simply updated. Otherwise, fire
80        // the event and add to the queue.
81
82        let actual_vec_index = client_chat_index - global_chat_state.chat_min_index;
83
84        // go through the queue and find the first message that's after the bot's index
85        let mut found = false;
86        for (i, past_message) in global_chat_state
87            .chat_queue
88            .iter()
89            .enumerate()
90            .skip(actual_vec_index)
91        {
92            if past_message == &event.packet {
93                // found the message, update the index
94                client_chat_index = i + global_chat_state.chat_min_index + 1;
95                found = true;
96                break;
97            }
98        }
99
100        if !found {
101            // didn't find the message, so fire the swarm event and add to the queue
102            new_chat_messages_events.send(NewChatMessageEvent(event.packet.clone()));
103            global_chat_state.chat_queue.push_back(event.packet.clone());
104            client_chat_index =
105                global_chat_state.chat_queue.len() + global_chat_state.chat_min_index;
106        }
107        if let Ok(ref mut client_chat_state) = client_chat_state {
108            client_chat_state.chat_index = client_chat_index;
109        } else {
110            commands.entity(event.entity).insert(ClientChatState {
111                chat_index: client_chat_index,
112            });
113        }
114    }
115}
116
117fn update_min_index_and_shrink_queue(
118    query: Query<&ClientChatState>,
119    mut global_chat_state: ResMut<GlobalChatState>,
120    mut events: EventReader<NewChatMessageEvent>,
121    swarm: Option<Res<Swarm>>,
122) {
123    for event in events.read() {
124        if let Some(swarm) = &swarm {
125            // it should also work if Swarm isn't present (so the tests don't need it)
126            swarm
127                .swarm_tx
128                .send(SwarmEvent::Chat(event.0.clone()))
129                .unwrap();
130        }
131        // To make sure the queue doesn't grow too large, we keep a `chat_min_index`
132        // in Swarm that's set to the smallest index of all the bots, and we remove all
133        // messages from the queue that are before that index.
134
135        let mut new_chat_min_index = global_chat_state.chat_min_index;
136        for client_chat_state in query.iter() {
137            let this_chat_index = client_chat_state.chat_index;
138            if this_chat_index < new_chat_min_index {
139                new_chat_min_index = this_chat_index;
140            }
141        }
142
143        if global_chat_state.chat_min_index > new_chat_min_index {
144            return;
145        }
146        // remove all messages from the queue that are before the min index
147        for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) {
148            global_chat_state.chat_queue.pop_front();
149        }
150
151        // update the min index
152        global_chat_state.chat_min_index = new_chat_min_index;
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use bevy_ecs::{event::Events, prelude::World, system::SystemState};
159
160    use super::*;
161
162    fn make_test_app() -> App {
163        let mut app = App::new();
164        // we add the events like this instead of with .add_event so we can have our own
165        // event management in drain_events
166        app.init_resource::<Events<ChatReceivedEvent>>()
167            .init_resource::<Events<NewChatMessageEvent>>()
168            .add_systems(
169                Update,
170                (chat_listener, update_min_index_and_shrink_queue).chain(),
171            )
172            .insert_resource(GlobalChatState {
173                chat_queue: VecDeque::new(),
174                chat_min_index: 0,
175            });
176        app
177    }
178
179    fn drain_events(ecs: &mut World) -> Vec<ChatPacket> {
180        let mut system_state: SystemState<ResMut<Events<NewChatMessageEvent>>> =
181            SystemState::new(ecs);
182        let mut events = system_state.get_mut(ecs);
183
184        events.drain().map(|e| e.0.clone()).collect::<Vec<_>>()
185    }
186
187    #[tokio::test]
188    async fn test_swarm_chat() {
189        let mut app = make_test_app();
190
191        let bot0 = app.world_mut().spawn_empty().id();
192        let bot1 = app.world_mut().spawn_empty().id();
193
194        app.world_mut().send_event(ChatReceivedEvent {
195            entity: bot0,
196            packet: ChatPacket::new("a"),
197        });
198        app.update();
199
200        // the swarm should get the event immediately after the bot gets it
201        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("a")]);
202        assert_eq!(
203            app.world().get::<ClientChatState>(bot0).unwrap().chat_index,
204            1
205        );
206        // and a second bot sending the event shouldn't do anything
207        app.world_mut().send_event(ChatReceivedEvent {
208            entity: bot1,
209            packet: ChatPacket::new("a"),
210        });
211        app.update();
212        assert_eq!(drain_events(app.world_mut()), vec![]);
213        assert_eq!(
214            app.world().get::<ClientChatState>(bot1).unwrap().chat_index,
215            1
216        );
217
218        // but if the first one gets it again, it should sent it again
219        app.world_mut().send_event(ChatReceivedEvent {
220            entity: bot0,
221            packet: ChatPacket::new("a"),
222        });
223        app.update();
224        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("a")]);
225
226        // alright and now the second bot got a different chat message and it should be
227        // sent
228        app.world_mut().send_event(ChatReceivedEvent {
229            entity: bot1,
230            packet: ChatPacket::new("b"),
231        });
232        app.update();
233        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("b")]);
234    }
235
236    #[tokio::test]
237    async fn test_new_bot() {
238        let mut app = make_test_app();
239
240        let bot0 = app.world_mut().spawn_empty().id();
241
242        // bot0 gets a chat message
243        app.world_mut().send_event(ChatReceivedEvent {
244            entity: bot0,
245            packet: ChatPacket::new("a"),
246        });
247        app.update();
248        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("a")]);
249        let bot1 = app.world_mut().spawn_empty().id();
250        app.world_mut().send_event(ChatReceivedEvent {
251            entity: bot1,
252            packet: ChatPacket::new("b"),
253        });
254        app.update();
255        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("b")]);
256    }
257}