azalea/swarm/
chat.rs

1//! Implements `SwarmEvent::Chat`.
2
3// How the chat event works (to avoid firing the event multiple times):
4// ---
5// There's a shared queue of all the chat messages
6// Each bot contains an index of the farthest message we've seen
7// When a bot receives a chat messages, it looks into the queue to find the
8// earliest instance of the message content that's after the bot's chat index.
9// If it finds it, then its personal index is simply updated. Otherwise, fire
10// the event and add to the queue.
11//
12// To make sure the queue doesn't grow too large, we keep a `chat_min_index`
13// in Swarm that's set to the smallest index of all the bots, and we remove all
14// messages from the queue that are before that index.
15
16use std::collections::VecDeque;
17
18use azalea_client::chat::{ChatPacket, ChatReceivedEvent};
19use bevy_app::{App, Plugin, Update};
20use bevy_ecs::prelude::Event;
21
22use super::{Swarm, SwarmEvent};
23use crate::ecs::prelude::*;
24
25#[derive(Clone)]
26pub struct SwarmChatPlugin;
27impl Plugin for SwarmChatPlugin {
28    fn build(&self, app: &mut App) {
29        app.add_event::<NewChatMessageEvent>()
30            .add_systems(
31                Update,
32                (chat_listener, update_min_index_and_shrink_queue).chain(),
33            )
34            .insert_resource(GlobalChatState {
35                chat_queue: VecDeque::new(),
36                chat_min_index: 0,
37            });
38    }
39}
40
41#[derive(Component, Debug)]
42pub struct ClientChatState {
43    pub chat_index: usize,
44}
45
46/// A chat message that no other bots have seen yet was received by a bot.
47#[derive(Event, Debug)]
48pub struct NewChatMessageEvent(ChatPacket);
49
50#[derive(Resource)]
51pub struct GlobalChatState {
52    pub chat_queue: VecDeque<ChatPacket>,
53    pub chat_min_index: usize,
54}
55
56fn chat_listener(
57    mut commands: Commands,
58    mut query: Query<&mut ClientChatState>,
59    mut events: EventReader<ChatReceivedEvent>,
60    mut global_chat_state: ResMut<GlobalChatState>,
61    mut new_chat_messages_events: EventWriter<NewChatMessageEvent>,
62) {
63    for event in events.read() {
64        let mut client_chat_state = query.get_mut(event.entity);
65        let mut client_chat_index = if let Ok(ref client_chat_state) = client_chat_state {
66            client_chat_state.chat_index
67        } else {
68            // if the client has no chat state, we default to this and insert it at the end
69            global_chat_state.chat_min_index
70        };
71
72        // When a bot receives a chat messages, it looks into the queue to find the
73        // earliest instance of the message content that's after the bot's chat index.
74        // If it finds it, then its personal index is simply updated. Otherwise, fire
75        // the event and add to the queue.
76
77        let actual_vec_index = client_chat_index - global_chat_state.chat_min_index;
78
79        // go through the queue and find the first message that's after the bot's index
80        let mut found = false;
81        for (i, past_message) in global_chat_state
82            .chat_queue
83            .iter()
84            .enumerate()
85            .skip(actual_vec_index)
86        {
87            if past_message == &event.packet {
88                // found the message, update the index
89                client_chat_index = i + global_chat_state.chat_min_index + 1;
90                found = true;
91                break;
92            }
93        }
94
95        if !found {
96            // didn't find the message, so fire the swarm event and add to the queue
97            new_chat_messages_events.write(NewChatMessageEvent(event.packet.clone()));
98            global_chat_state.chat_queue.push_back(event.packet.clone());
99            client_chat_index =
100                global_chat_state.chat_queue.len() + global_chat_state.chat_min_index;
101        }
102        if let Ok(ref mut client_chat_state) = client_chat_state {
103            client_chat_state.chat_index = client_chat_index;
104        } else {
105            commands.entity(event.entity).insert(ClientChatState {
106                chat_index: client_chat_index,
107            });
108        }
109    }
110}
111
112fn update_min_index_and_shrink_queue(
113    query: Query<&ClientChatState>,
114    mut global_chat_state: ResMut<GlobalChatState>,
115    mut events: EventReader<NewChatMessageEvent>,
116    swarm: Option<Res<Swarm>>,
117) {
118    for event in events.read() {
119        if let Some(swarm) = &swarm {
120            // it should also work if Swarm isn't present (so the tests don't need it)
121            swarm
122                .swarm_tx
123                .send(SwarmEvent::Chat(event.0.clone()))
124                .unwrap();
125        }
126        // To make sure the queue doesn't grow too large, we keep a `chat_min_index`
127        // in Swarm that's set to the smallest index of all the bots, and we remove all
128        // messages from the queue that are before that index.
129
130        let mut new_chat_min_index = global_chat_state.chat_min_index;
131        for client_chat_state in query.iter() {
132            let this_chat_index = client_chat_state.chat_index;
133            if this_chat_index < new_chat_min_index {
134                new_chat_min_index = this_chat_index;
135            }
136        }
137
138        if global_chat_state.chat_min_index > new_chat_min_index {
139            return;
140        }
141        // remove all messages from the queue that are before the min index
142        for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) {
143            global_chat_state.chat_queue.pop_front();
144        }
145
146        // update the min index
147        global_chat_state.chat_min_index = new_chat_min_index;
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use bevy_ecs::{event::Events, prelude::World, system::SystemState};
154
155    use super::*;
156
157    fn make_test_app() -> App {
158        let mut app = App::new();
159        // we add the events like this instead of with .add_event so we can have our own
160        // event management in drain_events
161        app.init_resource::<Events<ChatReceivedEvent>>()
162            .init_resource::<Events<NewChatMessageEvent>>()
163            .add_systems(
164                Update,
165                (chat_listener, update_min_index_and_shrink_queue).chain(),
166            )
167            .insert_resource(GlobalChatState {
168                chat_queue: VecDeque::new(),
169                chat_min_index: 0,
170            });
171        app
172    }
173
174    fn drain_events(ecs: &mut World) -> Vec<ChatPacket> {
175        let mut system_state: SystemState<ResMut<Events<NewChatMessageEvent>>> =
176            SystemState::new(ecs);
177        let mut events = system_state.get_mut(ecs);
178
179        events.drain().map(|e| e.0.clone()).collect::<Vec<_>>()
180    }
181
182    #[tokio::test]
183    async fn test_swarm_chat() {
184        let mut app = make_test_app();
185
186        let bot0 = app.world_mut().spawn_empty().id();
187        let bot1 = app.world_mut().spawn_empty().id();
188
189        app.world_mut().send_event(ChatReceivedEvent {
190            entity: bot0,
191            packet: ChatPacket::new("a"),
192        });
193        app.update();
194
195        // the swarm should get the event immediately after the bot gets it
196        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("a")]);
197        assert_eq!(
198            app.world().get::<ClientChatState>(bot0).unwrap().chat_index,
199            1
200        );
201        // and a second bot sending the event shouldn't do anything
202        app.world_mut().send_event(ChatReceivedEvent {
203            entity: bot1,
204            packet: ChatPacket::new("a"),
205        });
206        app.update();
207        assert_eq!(drain_events(app.world_mut()), vec![]);
208        assert_eq!(
209            app.world().get::<ClientChatState>(bot1).unwrap().chat_index,
210            1
211        );
212
213        // but if the first one gets it again, it should sent it again
214        app.world_mut().send_event(ChatReceivedEvent {
215            entity: bot0,
216            packet: ChatPacket::new("a"),
217        });
218        app.update();
219        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("a")]);
220
221        // alright and now the second bot got a different chat message and it should be
222        // sent
223        app.world_mut().send_event(ChatReceivedEvent {
224            entity: bot1,
225            packet: ChatPacket::new("b"),
226        });
227        app.update();
228        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("b")]);
229    }
230
231    #[tokio::test]
232    async fn test_new_bot() {
233        let mut app = make_test_app();
234
235        let bot0 = app.world_mut().spawn_empty().id();
236
237        // bot0 gets a chat message
238        app.world_mut().send_event(ChatReceivedEvent {
239            entity: bot0,
240            packet: ChatPacket::new("a"),
241        });
242        app.update();
243        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("a")]);
244        let bot1 = app.world_mut().spawn_empty().id();
245        app.world_mut().send_event(ChatReceivedEvent {
246            entity: bot1,
247            packet: ChatPacket::new("b"),
248        });
249        app.update();
250        assert_eq!(drain_events(app.world_mut()), vec![ChatPacket::new("b")]);
251    }
252}