azalea/swarm/
mod.rs

1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod builder;
6mod chat;
7mod events;
8pub mod prelude;
9
10use std::sync::{
11    Arc,
12    atomic::{self, AtomicBool},
13};
14
15use azalea_client::{Account, Client, Event, StartClientOpts, chat::ChatPacket, join::ConnectOpts};
16use azalea_entity::LocalEntity;
17use azalea_protocol::address::ResolvedAddr;
18use azalea_world::InstanceContainer;
19use bevy_app::{PluginGroup, PluginGroupBuilder};
20use bevy_ecs::prelude::*;
21pub use builder::SwarmBuilder;
22use futures::future::BoxFuture;
23use parking_lot::{Mutex, RwLock};
24use tokio::{sync::mpsc, task};
25use tracing::{debug, error, warn};
26
27use crate::JoinOpts;
28
29/// A swarm is a way to conveniently control many bots at once, while also
30/// being able to control bots at an individual level when desired.
31///
32/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
33///
34/// Swarms are created from [`SwarmBuilder`].
35///
36/// Clients can be added to the swarm later via [`Swarm::add`], and can be
37/// removed with [`Client::disconnect`].
38#[derive(Clone, Resource)]
39pub struct Swarm {
40    pub ecs_lock: Arc<Mutex<World>>,
41
42    // the address is public and mutable so plugins can change it
43    pub address: Arc<RwLock<ResolvedAddr>>,
44
45    pub instance_container: Arc<RwLock<InstanceContainer>>,
46
47    /// This is used internally to make the client handler function work.
48    pub(crate) bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
49    /// This is used internally to make the swarm handler function work.
50    pub(crate) swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
51}
52
53/// An event about something that doesn't have to do with a single bot.
54#[derive(Clone, Debug)]
55#[non_exhaustive]
56pub enum SwarmEvent {
57    /// All the bots in the swarm have successfully joined the server.
58    Login,
59    /// The swarm was created.
60    ///
61    /// This is only fired once, and it's guaranteed to be the first event to
62    /// fire.
63    Init,
64    /// A bot got disconnected from the server.
65    ///
66    /// If you'd like to implement special auto-reconnect behavior beyond what's
67    /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
68    /// and then call [`Swarm::add_with_opts`] with the account and options
69    /// from this event.
70    ///
71    /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
72    Disconnect(Box<Account>, Box<JoinOpts>),
73    /// At least one bot received a chat message.
74    Chat(ChatPacket),
75}
76
77pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
78pub type BoxSwarmHandleFn<SS, R> =
79    Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
80
81/// Make a bot [`Swarm`].
82///
83/// [`Swarm`]: struct.Swarm.html
84///
85/// # Examples
86/// ```rust,no_run
87/// use azalea::{prelude::*, swarm::prelude::*};
88/// use std::time::Duration;
89///
90/// #[derive(Clone, Component, Default)]
91/// struct State {}
92///
93/// #[derive(Clone, Default, Resource)]
94/// struct SwarmState {}
95///
96/// #[tokio::main]
97/// async fn main() -> AppExit {
98///     let mut accounts = Vec::new();
99///     let mut states = Vec::new();
100///
101///     for i in 0..10 {
102///         accounts.push(Account::offline(&format!("bot{i}")));
103///         states.push(State::default());
104///     }
105///
106///     SwarmBuilder::new()
107///         .add_accounts(accounts.clone())
108///         .set_handler(handle)
109///         .set_swarm_handler(swarm_handle)
110///         .join_delay(Duration::from_millis(1000))
111///         .start("localhost")
112///         .await
113/// }
114///
115/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
116///     match &event {
117///         _ => {}
118///     }
119///     Ok(())
120/// }
121///
122/// async fn swarm_handle(
123///     mut swarm: Swarm,
124///     event: SwarmEvent,
125///     _state: SwarmState,
126/// ) -> anyhow::Result<()> {
127///     match &event {
128///         SwarmEvent::Chat(m) => {
129///             println!("{}", m.message().to_ansi());
130///         }
131///         _ => {}
132///     }
133///     Ok(())
134/// }
135impl Swarm {
136    /// Add a new account to the swarm.
137    ///
138    /// You can remove it later by calling [`Client::disconnect`].
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the server's address could not be resolved.
143    pub async fn add<S: Component + Clone>(&self, account: &Account, state: S) -> Client {
144        self.add_with_opts(account, state, &JoinOpts::default())
145            .await
146    }
147    /// Add a new account to the swarm, using custom options.
148    ///
149    /// This is useful if you want bots in the same swarm to connect to
150    /// different addresses. Usually you'll just want [`Self::add`] though.
151    ///
152    /// # Errors
153    ///
154    /// Returns an error if the server's address could not be resolved.
155    pub async fn add_with_opts<S: Component + Clone>(
156        &self,
157        account: &Account,
158        state: S,
159        join_opts: &JoinOpts,
160    ) -> Client {
161        debug!(
162            "add_with_opts called for account {} with opts {join_opts:?}",
163            account.username
164        );
165
166        let mut address = self.address.read().clone();
167        if let Some(custom_server_addr) = join_opts.custom_server_addr.clone() {
168            address.server = custom_server_addr;
169        }
170        if let Some(custom_socket_addr) = join_opts.custom_socket_addr {
171            address.socket = custom_socket_addr;
172        }
173        let server_proxy = join_opts.server_proxy.clone();
174        let sessionserver_proxy = join_opts.sessionserver_proxy.clone();
175
176        let (tx, rx) = mpsc::unbounded_channel();
177
178        let client = Client::start_client(StartClientOpts {
179            ecs_lock: self.ecs_lock.clone(),
180            account: account.clone(),
181            connect_opts: ConnectOpts {
182                address,
183                server_proxy,
184                sessionserver_proxy,
185            },
186            event_sender: Some(tx),
187        })
188        .await;
189        // add the state to the client
190        {
191            let mut ecs = self.ecs_lock.lock();
192            ecs.entity_mut(client.entity).insert(state);
193        }
194
195        let cloned_bot = client.clone();
196        let swarm_tx = self.swarm_tx.clone();
197        let bots_tx = self.bots_tx.clone();
198
199        let join_opts = join_opts.clone();
200        task::spawn_local(Self::event_copying_task(
201            rx, swarm_tx, bots_tx, cloned_bot, join_opts,
202        ));
203
204        client
205    }
206
207    /// Copy the events from a client's receiver into bots_tx, until the bot is
208    /// removed from the ECS.
209    async fn event_copying_task(
210        mut rx: mpsc::UnboundedReceiver<Event>,
211        swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
212        bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
213        bot: Client,
214        join_opts: JoinOpts,
215    ) {
216        while let Some(event) = rx.recv().await {
217            if rx.len() > 1_000 {
218                static WARNED_1_000: AtomicBool = AtomicBool::new(false);
219                if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
220                    warn!(
221                        "the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?"
222                    )
223                }
224
225                if rx.len() > 10_000 {
226                    static WARNED_10_000: AtomicBool = AtomicBool::new(false);
227                    if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
228                        warn!("the client's Event channel has more than 10,000 items!!")
229                    }
230
231                    if rx.len() > 100_000 {
232                        static WARNED_100_000: AtomicBool = AtomicBool::new(false);
233                        if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
234                            warn!("the client's Event channel has more than 100,000 items!!!")
235                        }
236
237                        if rx.len() > 1_000_000 {
238                            static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
239                            if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
240                                warn!(
241                                    "the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
242                                )
243                            }
244                        }
245                    }
246                }
247            }
248
249            if let Event::Disconnect(_) = event {
250                debug!(
251                    "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
252                    bot.entity
253                );
254                let account = bot
255                    .get_component::<Account>()
256                    .expect("bot is missing required Account component");
257                swarm_tx
258                    .send(SwarmEvent::Disconnect(
259                        Box::new(account),
260                        Box::new(join_opts.clone()),
261                    ))
262                    .unwrap();
263            }
264
265            // we can't handle events here (since we can't copy the handler),
266            // they're handled above in SwarmBuilder::start
267            if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
268                error!(
269                    "Error sending event to swarm, aborting event_copying_task for {}: {e}",
270                    bot.entity
271                );
272                break;
273            }
274        }
275        debug!(
276            "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
277            bot.entity
278        );
279    }
280
281    /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
282    /// This will include clients that were disconnected without being removed
283    /// from the ECS.
284    ///
285    /// [`LocalEntity`]: azalea_entity::LocalEntity
286    pub fn client_entities(&self) -> Box<[Entity]> {
287        let mut ecs = self.ecs_lock.lock();
288        let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
289        query.iter(&ecs).collect::<Box<[Entity]>>()
290    }
291}
292
293impl IntoIterator for Swarm {
294    type Item = Client;
295    type IntoIter = std::vec::IntoIter<Self::Item>;
296
297    /// Iterate over the bots in this swarm.
298    ///
299    /// ```rust,no_run
300    /// # use azalea::{prelude::*, swarm::prelude::*};
301    /// #[derive(Clone, Component)]
302    /// # pub struct State;
303    /// # fn example(swarm: Swarm) {
304    /// for bot in swarm {
305    ///     let state = bot.component::<State>();
306    ///     // ...
307    /// }
308    /// # }
309    /// ```
310    fn into_iter(self) -> Self::IntoIter {
311        let client_entities = self.client_entities();
312
313        client_entities
314            .into_iter()
315            .map(|entity| Client::new(entity, self.ecs_lock.clone()))
316            .collect::<Box<[Client]>>()
317            .into_iter()
318    }
319}
320
321/// This plugin group will add all the default plugins necessary for swarms to
322/// work.
323pub struct DefaultSwarmPlugins;
324
325impl PluginGroup for DefaultSwarmPlugins {
326    fn build(self) -> PluginGroupBuilder {
327        PluginGroupBuilder::start::<Self>()
328            .add(chat::SwarmChatPlugin)
329            .add(events::SwarmPlugin)
330    }
331}
332
333/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
334///
335/// You probably don't need to use this manually since the compiler will infer
336/// it for you.
337#[derive(Clone, Default, Resource)]
338pub struct NoSwarmState;