azalea/swarm/
chat.rs

1//! Implements `SwarmEvent::Chat`.
2
3// How the chat event works (to avoid firing the event multiple times):
4// ---
5// There's a shared queue of all the chat messages
6// Each bot contains an index of the farthest message we've seen
7// When a bot receives a chat messages, it looks into the queue to find the
8// earliest instance of the message content that's after the bot's chat index.
9// If it finds it, then its personal index is simply updated. Otherwise, fire
10// the event and add to the queue.
11//
12// To make sure the queue doesn't grow too large, we keep a `chat_min_index`
13// in Swarm that's set to the smallest index of all the bots, and we remove all
14// messages from the queue that are before that index.
15
16use std::collections::VecDeque;
17
18use azalea_client::chat::{ChatPacket, ChatReceivedEvent};
19use bevy_app::{App, Plugin, Update};
20
21use super::{Swarm, SwarmEvent};
22use crate::ecs::prelude::*;
23
24#[derive(Clone)]
25pub struct SwarmChatPlugin;
26impl Plugin for SwarmChatPlugin {
27    fn build(&self, app: &mut App) {
28        app.add_message::<NewChatMessageEvent>()
29            .add_systems(
30                Update,
31                (chat_listener, update_min_index_and_shrink_queue).chain(),
32            )
33            .insert_resource(GlobalChatState {
34                chat_queue: VecDeque::new(),
35                chat_min_index: 0,
36            });
37    }
38}
39
40#[derive(Component, Debug)]
41pub struct ClientChatState {
42    pub chat_index: usize,
43}
44
45/// A chat message that no other bots have seen yet was received by a bot.
46#[derive(Message, Debug)]
47pub struct NewChatMessageEvent(ChatPacket);
48
49#[derive(Resource)]
50pub struct GlobalChatState {
51    pub chat_queue: VecDeque<ChatPacket>,
52    pub chat_min_index: usize,
53}
54
55fn chat_listener(
56    mut commands: Commands,
57    mut query: Query<&mut ClientChatState>,
58    mut events: MessageReader<ChatReceivedEvent>,
59    mut global_chat_state: ResMut<GlobalChatState>,
60    mut new_chat_messages_events: MessageWriter<NewChatMessageEvent>,
61) {
62    for event in events.read() {
63        let mut client_chat_state = query.get_mut(event.entity);
64        let mut client_chat_index = if let Ok(ref client_chat_state) = client_chat_state {
65            client_chat_state.chat_index
66        } else {
67            // if the client has no chat state, we default to this and insert it at the end
68            global_chat_state.chat_min_index
69        };
70
71        // When a bot receives a chat messages, it looks into the queue to find the
72        // earliest instance of the message content that's after the bot's chat index.
73        // If it finds it, then its personal index is simply updated. Otherwise, fire
74        // the event and add to the queue.
75
76        let actual_vec_index = client_chat_index - global_chat_state.chat_min_index;
77
78        // go through the queue and find the first message that's after the bot's index
79        let mut found = false;
80        for (i, past_message) in global_chat_state
81            .chat_queue
82            .iter()
83            .enumerate()
84            .skip(actual_vec_index)
85        {
86            if past_message == &event.packet {
87                // found the message, update the index
88                client_chat_index = i + global_chat_state.chat_min_index + 1;
89                found = true;
90                break;
91            }
92        }
93
94        if !found {
95            // didn't find the message, so fire the swarm event and add to the queue
96            new_chat_messages_events.write(NewChatMessageEvent(event.packet.clone()));
97            global_chat_state.chat_queue.push_back(event.packet.clone());
98            client_chat_index =
99                global_chat_state.chat_queue.len() + global_chat_state.chat_min_index;
100        }
101        if let Ok(ref mut client_chat_state) = client_chat_state {
102            client_chat_state.chat_index = client_chat_index;
103        } else {
104            commands.entity(event.entity).insert(ClientChatState {
105                chat_index: client_chat_index,
106            });
107        }
108    }
109}
110
111fn update_min_index_and_shrink_queue(
112    query: Query<&ClientChatState>,
113    mut global_chat_state: ResMut<GlobalChatState>,
114    mut events: MessageReader<NewChatMessageEvent>,
115    swarm: Option<Res<Swarm>>,
116) {
117    for event in events.read() {
118        if let Some(swarm) = &swarm {
119            // it should also work if Swarm isn't present (so the tests don't need it)
120            swarm
121                .swarm_tx
122                .send(SwarmEvent::Chat(event.0.clone()))
123                .unwrap();
124        }
125        // To make sure the queue doesn't grow too large, we keep a `chat_min_index`
126        // in Swarm that's set to the smallest index of all the bots, and we remove all
127        // messages from the queue that are before that index.
128
129        let mut new_chat_min_index = global_chat_state.chat_min_index;
130        for client_chat_state in query.iter() {
131            let this_chat_index = client_chat_state.chat_index;
132            if this_chat_index < new_chat_min_index {
133                new_chat_min_index = this_chat_index;
134            }
135        }
136
137        if global_chat_state.chat_min_index > new_chat_min_index {
138            return;
139        }
140        // remove all messages from the queue that are before the min index
141        for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) {
142            global_chat_state.chat_queue.pop_front();
143        }
144
145        // update the min index
146        global_chat_state.chat_min_index = new_chat_min_index;
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use bevy_ecs::{prelude::World, system::SystemState};
153
154    use super::*;
155
156    fn make_test_app() -> App {
157        let mut app = App::new();
158        // we add the events like this instead of with .add_message so we can have our
159        // own event management in drain_messages
160        app.init_resource::<Messages<ChatReceivedEvent>>()
161            .init_resource::<Messages<NewChatMessageEvent>>()
162            .add_systems(
163                Update,
164                (chat_listener, update_min_index_and_shrink_queue).chain(),
165            )
166            .insert_resource(GlobalChatState {
167                chat_queue: VecDeque::new(),
168                chat_min_index: 0,
169            });
170        app
171    }
172
173    fn drain_messages(ecs: &mut World) -> Vec<ChatPacket> {
174        let mut system_state: SystemState<ResMut<Messages<NewChatMessageEvent>>> =
175            SystemState::new(ecs);
176        let mut messages = system_state.get_mut(ecs);
177
178        messages.drain().map(|e| e.0.clone()).collect::<Vec<_>>()
179    }
180
181    #[tokio::test]
182    async fn test_swarm_chat() {
183        let mut app = make_test_app();
184
185        let bot0 = app.world_mut().spawn_empty().id();
186        let bot1 = app.world_mut().spawn_empty().id();
187
188        app.world_mut().write_message(ChatReceivedEvent {
189            entity: bot0,
190            packet: ChatPacket::new("a"),
191        });
192        app.update();
193
194        // the swarm should get the event immediately after the bot gets it
195        assert_eq!(drain_messages(app.world_mut()), vec![ChatPacket::new("a")]);
196        assert_eq!(
197            app.world().get::<ClientChatState>(bot0).unwrap().chat_index,
198            1
199        );
200        // and a second bot sending the event shouldn't do anything
201        app.world_mut().write_message(ChatReceivedEvent {
202            entity: bot1,
203            packet: ChatPacket::new("a"),
204        });
205        app.update();
206        assert_eq!(drain_messages(app.world_mut()), vec![]);
207        assert_eq!(
208            app.world().get::<ClientChatState>(bot1).unwrap().chat_index,
209            1
210        );
211
212        // but if the first one gets it again, it should sent it again
213        app.world_mut().write_message(ChatReceivedEvent {
214            entity: bot0,
215            packet: ChatPacket::new("a"),
216        });
217        app.update();
218        assert_eq!(drain_messages(app.world_mut()), vec![ChatPacket::new("a")]);
219
220        // alright and now the second bot got a different chat message and it should be
221        // sent
222        app.world_mut().write_message(ChatReceivedEvent {
223            entity: bot1,
224            packet: ChatPacket::new("b"),
225        });
226        app.update();
227        assert_eq!(drain_messages(app.world_mut()), vec![ChatPacket::new("b")]);
228    }
229
230    #[tokio::test]
231    async fn test_new_bot() {
232        let mut app = make_test_app();
233
234        let bot0 = app.world_mut().spawn_empty().id();
235
236        // bot0 gets a chat message
237        app.world_mut().write_message(ChatReceivedEvent {
238            entity: bot0,
239            packet: ChatPacket::new("a"),
240        });
241        app.update();
242        assert_eq!(drain_messages(app.world_mut()), vec![ChatPacket::new("a")]);
243        let bot1 = app.world_mut().spawn_empty().id();
244        app.world_mut().write_message(ChatReceivedEvent {
245            entity: bot1,
246            packet: ChatPacket::new("b"),
247        });
248        app.update();
249        assert_eq!(drain_messages(app.world_mut()), vec![ChatPacket::new("b")]);
250    }
251}