azalea/swarm/
mod.rs

1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10    collections::{HashMap, hash_map},
11    future::Future,
12    net::SocketAddr,
13    sync::{
14        Arc,
15        atomic::{self, AtomicBool},
16    },
17    time::Duration,
18};
19
20use azalea_client::{
21    Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket,
22    start_ecs_runner,
23};
24use azalea_protocol::{ServerAddress, resolver};
25use azalea_world::InstanceContainer;
26use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins};
27use bevy_ecs::{component::Component, entity::Entity, system::Resource, world::World};
28use futures::future::{BoxFuture, join_all};
29use parking_lot::{Mutex, RwLock};
30use tokio::sync::mpsc;
31use tracing::{debug, error, warn};
32
33use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
34
35/// A swarm is a way to conveniently control many bots at once, while also
36/// being able to control bots at an individual level when desired.
37///
38/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
39///
40/// Swarms are created from [`SwarmBuilder`].
41///
42/// Clients can be added to the swarm later via [`Swarm::add`], and can be
43/// removed with [`Client::disconnect`].
44#[derive(Clone, Resource)]
45pub struct Swarm {
46    pub ecs_lock: Arc<Mutex<World>>,
47
48    bots: Arc<Mutex<HashMap<Entity, Client>>>,
49
50    // the address is public and mutable so plugins can change it
51    pub resolved_address: Arc<RwLock<SocketAddr>>,
52    pub address: Arc<RwLock<ServerAddress>>,
53
54    pub instance_container: Arc<RwLock<InstanceContainer>>,
55
56    bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
57    swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
58
59    run_schedule_sender: mpsc::Sender<()>,
60}
61
62/// Create a new [`Swarm`].
63///
64/// The generics of this struct stand for the following:
65/// - S: State
66/// - SS: Swarm State
67/// - R: Return type of the handler
68/// - SR: Return type of the swarm handler
69///
70/// You shouldn't have to manually set them though, they'll be inferred for you.
71pub struct SwarmBuilder<S, SS, R, SR>
72where
73    S: Send + Sync + Clone + Component + 'static,
74    SS: Default + Send + Sync + Clone + Resource + 'static,
75{
76    pub(crate) app: App,
77    /// The accounts and proxies that are going to join the server.
78    pub(crate) accounts: Vec<(Account, JoinOpts)>,
79    /// The individual bot states. This must be the same length as `accounts`,
80    /// since each bot gets one state.
81    pub(crate) states: Vec<S>,
82    /// The state for the overall swarm.
83    pub(crate) swarm_state: SS,
84    /// The function that's called every time a bot receives an [`Event`].
85    pub(crate) handler: Option<BoxHandleFn<S, R>>,
86    /// The function that's called every time the swarm receives a
87    /// [`SwarmEvent`].
88    pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
89
90    /// How long we should wait between each bot joining the server. Set to
91    /// None to have every bot connect at the same time. None is different than
92    /// a duration of 0, since if a duration is present the bots will wait for
93    /// the previous one to be ready.
94    pub(crate) join_delay: Option<std::time::Duration>,
95}
96impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
97    /// Start creating the swarm.
98    #[must_use]
99    pub fn new() -> Self {
100        Self::new_without_plugins()
101            .add_plugins(DefaultPlugins)
102            .add_plugins(DefaultBotPlugins)
103            .add_plugins(DefaultSwarmPlugins)
104    }
105
106    /// [`Self::new`] but without adding the plugins by default. This is useful
107    /// if you want to disable a default plugin.
108    ///
109    /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
110    /// [`DefaultSwarmPlugins`] to this.
111    ///
112    /// ```
113    /// # use azalea::{prelude::*, swarm::prelude::*};
114    /// use azalea::{app::PluginGroup, DefaultBotPlugins, DefaultPlugins, swarm::{DefaultSwarmPlugins}};
115    /// use bevy_log::LogPlugin;
116    ///
117    /// let swarm_builder = SwarmBuilder::new_without_plugins()
118    ///     .add_plugins(DefaultPlugins.build().disable::<LogPlugin>())
119    ///     .add_plugins(DefaultBotPlugins)
120    ///     .add_plugins(DefaultSwarmPlugins);
121    /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
122    /// # #[derive(Component, Resource, Clone, Default)]
123    /// # pub struct State;
124    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
125    /// #     Ok(())
126    /// # }
127    /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
128    /// #     Ok(())
129    /// # }
130    /// ```
131    #[must_use]
132    pub fn new_without_plugins() -> Self {
133        SwarmBuilder {
134            // we create the app here so plugins can add onto it.
135            // the schedules won't run until [`Self::start`] is called.
136            app: App::new(),
137            accounts: Vec::new(),
138            states: Vec::new(),
139            swarm_state: NoSwarmState,
140            handler: None,
141            swarm_handler: None,
142            join_delay: None,
143        }
144    }
145}
146
147impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
148where
149    SS: Default + Send + Sync + Clone + Resource + 'static,
150{
151    /// Set the function that's called every time a bot receives an [`Event`].
152    /// This is the way to handle normal per-bot events.
153    ///
154    /// Currently you can have up to one handler.
155    ///
156    /// ```
157    /// # use azalea::{prelude::*, swarm::prelude::*};
158    /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
159    /// swarm_builder.set_handler(handle);
160    ///
161    /// #[derive(Component, Default, Clone)]
162    /// struct State {}
163    /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
164    ///     Ok(())
165    /// }
166    ///
167    /// # #[derive(Resource, Default, Clone)]
168    /// # struct SwarmState {}
169    /// # async fn swarm_handle(
170    /// #     mut swarm: Swarm,
171    /// #     event: SwarmEvent,
172    /// #     state: SwarmState,
173    /// # ) -> anyhow::Result<()> {
174    /// #     Ok(())
175    /// # }
176    /// ```
177    #[must_use]
178    pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
179    where
180        Fut: Future<Output = R> + Send + 'static,
181        S: Send + Sync + Clone + Component + Default + 'static,
182    {
183        SwarmBuilder {
184            handler: Some(Box::new(move |bot, event, state: S| {
185                Box::pin(handler(bot, event, state))
186            })),
187            // if we added accounts before the State was set, we've gotta set it to the default now
188            states: vec![S::default(); self.accounts.len()],
189            app: self.app,
190            ..self
191        }
192    }
193}
194
195impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
196where
197    S: Send + Sync + Clone + Component + 'static,
198{
199    /// Set the function that's called every time the swarm receives a
200    /// [`SwarmEvent`]. This is the way to handle global swarm events.
201    ///
202    /// Currently you can have up to one swarm handler.
203    ///
204    /// ```
205    /// # use azalea::{prelude::*, swarm::prelude::*};
206    /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
207    /// swarm_builder.set_swarm_handler(swarm_handle);
208    ///
209    /// # #[derive(Component, Default, Clone)]
210    /// # struct State {}
211    ///
212    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
213    /// #     Ok(())
214    /// # }
215    ///
216    /// #[derive(Resource, Default, Clone)]
217    /// struct SwarmState {}
218    /// async fn swarm_handle(
219    ///     mut swarm: Swarm,
220    ///     event: SwarmEvent,
221    ///     state: SwarmState,
222    /// ) -> anyhow::Result<()> {
223    ///     Ok(())
224    /// }
225    /// ```
226    #[must_use]
227    pub fn set_swarm_handler<SS, Fut, SR>(
228        self,
229        handler: SwarmHandleFn<SS, Fut>,
230    ) -> SwarmBuilder<S, SS, R, SR>
231    where
232        SS: Default + Send + Sync + Clone + Resource + 'static,
233        Fut: Future<Output = SR> + Send + 'static,
234    {
235        SwarmBuilder {
236            handler: self.handler,
237            app: self.app,
238            accounts: self.accounts,
239            states: self.states,
240            swarm_state: SS::default(),
241            swarm_handler: Some(Box::new(move |swarm, event, state| {
242                Box::pin(handler(swarm, event, state))
243            })),
244            join_delay: self.join_delay,
245        }
246    }
247}
248
249impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
250where
251    S: Send + Sync + Clone + Component + 'static,
252    SS: Default + Send + Sync + Clone + Resource + 'static,
253    R: Send + 'static,
254    SR: Send + 'static,
255{
256    /// Add a vec of [`Account`]s to the swarm.
257    ///
258    /// Use [`Self::add_account`] to only add one account. If you want the
259    /// clients to have different default states, add them one at a time with
260    /// [`Self::add_account_with_state`].
261    ///
262    /// By default, every account will join at the same time, you can add a
263    /// delay with [`Self::join_delay`].
264    #[must_use]
265    pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
266    where
267        S: Default,
268    {
269        for account in accounts {
270            self = self.add_account(account);
271        }
272
273        self
274    }
275
276    /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to
277    /// add multiple accounts at a time.
278    ///
279    /// This will make the state for this client be the default, use
280    /// [`Self::add_account_with_state`] to avoid that.
281    #[must_use]
282    pub fn add_account(self, account: Account) -> Self
283    where
284        S: Default,
285    {
286        self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
287    }
288
289    /// Add an account with a custom initial state. Use just
290    /// [`Self::add_account`] to use the Default implementation for the state.
291    #[must_use]
292    pub fn add_account_with_state(self, account: Account, state: S) -> Self {
293        self.add_account_with_state_and_opts(account, state, JoinOpts::default())
294    }
295
296    /// Add an account with a custom initial state. Use just
297    /// [`Self::add_account`] to use the Default implementation for the state.
298    #[must_use]
299    pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
300    where
301        S: Default,
302    {
303        self.add_account_with_state_and_opts(account, S::default(), opts)
304    }
305
306    /// Same as [`Self::add_account_with_state`], but allow passing in custom
307    /// join options.
308    #[must_use]
309    pub fn add_account_with_state_and_opts(
310        mut self,
311        account: Account,
312        state: S,
313        join_opts: JoinOpts,
314    ) -> Self {
315        self.accounts.push((account, join_opts));
316        self.states.push(state);
317        self
318    }
319
320    /// Set the swarm state instead of initializing defaults.
321    #[must_use]
322    pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
323        self.swarm_state = swarm_state;
324        self
325    }
326
327    /// Add one or more plugins to this swarm.
328    #[must_use]
329    pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
330        self.app.add_plugins(plugins);
331        self
332    }
333
334    /// Set how long we should wait between each bot joining the server.
335    ///
336    /// By default, every bot will connect at the same time. If you set this
337    /// field, however, the bots will wait for the previous one to have
338    /// connected and *then* they'll wait the given duration.
339    #[must_use]
340    pub fn join_delay(mut self, delay: std::time::Duration) -> Self {
341        self.join_delay = Some(delay);
342        self
343    }
344
345    /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
346    /// server.
347    ///
348    /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
349    /// that implements `TryInto<ServerAddress>`.
350    ///
351    /// [`ServerAddress`]: azalea_protocol::ServerAddress
352    pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<!, StartError> {
353        // convert the TryInto<ServerAddress> into a ServerAddress
354        let address: ServerAddress = match address.try_into() {
355            Ok(address) => address,
356            Err(_) => return Err(StartError::InvalidAddress),
357        };
358
359        self.start_with_default_opts(address, JoinOpts::default())
360            .await
361    }
362
363    /// Do the same as [`Self::start`], but allow passing in default join
364    /// options for the bots.
365    pub async fn start_with_default_opts(
366        self,
367        address: impl TryInto<ServerAddress>,
368        default_join_opts: JoinOpts,
369    ) -> Result<!, StartError> {
370        assert_eq!(
371            self.accounts.len(),
372            self.states.len(),
373            "There must be exactly one state per bot."
374        );
375
376        debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
377
378        // convert the TryInto<ServerAddress> into a ServerAddress
379        let address = match address.try_into() {
380            Ok(address) => address,
381            Err(_) => return Err(StartError::InvalidAddress),
382        };
383
384        let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
385        let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
386            a
387        } else {
388            resolver::resolve_address(&address).await?
389        };
390
391        let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
392
393        // we can't modify the swarm plugins after this
394        let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
395        let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
396
397        swarm_tx.send(SwarmEvent::Init).unwrap();
398
399        let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
400
401        let main_schedule_label = self.app.main().update_schedule.unwrap();
402
403        let ecs_lock =
404            start_ecs_runner(self.app, run_schedule_receiver, run_schedule_sender.clone());
405
406        let swarm = Swarm {
407            ecs_lock: ecs_lock.clone(),
408            bots: Arc::new(Mutex::new(HashMap::new())),
409
410            resolved_address: Arc::new(RwLock::new(resolved_address)),
411            address: Arc::new(RwLock::new(address)),
412            instance_container,
413
414            bots_tx,
415
416            swarm_tx: swarm_tx.clone(),
417
418            run_schedule_sender,
419        };
420
421        // run the main schedule so the startup systems run
422        {
423            let mut ecs = ecs_lock.lock();
424            ecs.insert_resource(swarm.clone());
425            ecs.insert_resource(self.swarm_state.clone());
426            ecs.run_schedule(main_schedule_label);
427            ecs.clear_trackers();
428        }
429
430        // SwarmBuilder (self) isn't Send so we have to take all the things we need out
431        // of it
432        let swarm_clone = swarm.clone();
433        let join_delay = self.join_delay;
434        let accounts = self.accounts.clone();
435        let states = self.states.clone();
436
437        tokio::spawn(async move {
438            if let Some(join_delay) = join_delay {
439                // if there's a join delay, then join one by one
440                for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
441                    let mut join_opts = default_join_opts.clone();
442                    join_opts.update(bot_join_opts);
443                    swarm_clone
444                        .add_and_retry_forever_with_opts(account, state, &join_opts)
445                        .await;
446                    tokio::time::sleep(join_delay).await;
447                }
448            } else {
449                // otherwise, join all at once
450                let swarm_borrow = &swarm_clone;
451                join_all(accounts.iter().zip(states).map(
452                    |((account, bot_join_opts), state)| async {
453                        let mut join_opts = default_join_opts.clone();
454                        join_opts.update(bot_join_opts);
455                        swarm_borrow
456                            .clone()
457                            .add_and_retry_forever_with_opts(account, state, &join_opts)
458                            .await;
459                    },
460                ))
461                .await;
462            }
463
464            swarm_tx.send(SwarmEvent::Login).unwrap();
465        });
466
467        let swarm_state = self.swarm_state;
468
469        // Watch swarm_rx and send those events to the swarm_handle.
470        let swarm_clone = swarm.clone();
471        tokio::spawn(async move {
472            while let Some(event) = swarm_rx.recv().await {
473                if let Some(swarm_handler) = &self.swarm_handler {
474                    tokio::spawn((swarm_handler)(
475                        swarm_clone.clone(),
476                        event,
477                        swarm_state.clone(),
478                    ));
479                }
480            }
481        });
482
483        // bot events
484        while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
485            if bots_rx.len() > 1_000 {
486                static WARNED: AtomicBool = AtomicBool::new(false);
487                if !WARNED.swap(true, atomic::Ordering::Relaxed) {
488                    warn!("the Client Event channel has more than 1000 items!")
489                }
490            }
491
492            if let Some(handler) = &self.handler {
493                let ecs_mutex = first_bot.ecs.clone();
494                let mut ecs = ecs_mutex.lock();
495                let Some(first_bot_state) = first_bot.query::<Option<&S>>(&mut ecs).cloned() else {
496                    error!(
497                        "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
498                        first_bot.profile.name, first_bot.entity
499                    );
500                    continue;
501                };
502                let first_bot_entity = first_bot.entity;
503
504                tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
505
506                // this makes it not have to keep locking the ecs
507                let mut states = HashMap::new();
508                states.insert(first_bot_entity, first_bot_state);
509                while let Ok((Some(event), bot)) = bots_rx.try_recv() {
510                    let state = match states.entry(bot.entity) {
511                        hash_map::Entry::Occupied(e) => e.into_mut(),
512                        hash_map::Entry::Vacant(e) => {
513                            let Some(state) = bot.query::<Option<&S>>(&mut ecs).cloned() else {
514                                error!(
515                                    "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
516                                    bot.profile.name, bot.entity
517                                );
518                                continue;
519                            };
520                            e.insert(state)
521                        }
522                    };
523                    tokio::spawn((handler)(bot, event, state.clone()));
524                }
525            }
526        }
527
528        unreachable!(
529            "bots_rx.recv() should never be None because the bots_tx channel is never closed"
530        );
531    }
532}
533
534impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
535    fn default() -> Self {
536        Self::new()
537    }
538}
539
540/// An event about something that doesn't have to do with a single bot.
541#[derive(Clone, Debug)]
542#[non_exhaustive]
543pub enum SwarmEvent {
544    /// All the bots in the swarm have successfully joined the server.
545    Login,
546    /// The swarm was created. This is only fired once, and it's guaranteed to
547    /// be the first event to fire.
548    Init,
549    /// A bot got disconnected from the server.
550    ///
551    /// You can implement an auto-reconnect by calling [`Swarm::add_with_opts`]
552    /// with the account and options from this event.
553    Disconnect(Box<Account>, JoinOpts),
554    /// At least one bot received a chat message.
555    Chat(ChatPacket),
556}
557
558pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
559pub type BoxSwarmHandleFn<SS, R> =
560    Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send>;
561
562/// Make a bot [`Swarm`].
563///
564/// [`Swarm`]: struct.Swarm.html
565///
566/// # Examples
567/// ```rust,no_run
568/// use azalea::{prelude::*, swarm::prelude::*};
569/// use std::time::Duration;
570///
571/// #[derive(Default, Clone, Component)]
572/// struct State {}
573///
574/// #[derive(Default, Clone, Resource)]
575/// struct SwarmState {}
576///
577/// #[tokio::main]
578/// async fn main() {
579///     let mut accounts = Vec::new();
580///     let mut states = Vec::new();
581///
582///     for i in 0..10 {
583///         accounts.push(Account::offline(&format!("bot{i}")));
584///         states.push(State::default());
585///     }
586///
587///     SwarmBuilder::new()
588///         .add_accounts(accounts.clone())
589///         .set_handler(handle)
590///         .set_swarm_handler(swarm_handle)
591///         .join_delay(Duration::from_millis(1000))
592///         .start("localhost")
593///         .await
594///         .unwrap();
595/// }
596///
597/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
598///     match &event {
599///         _ => {}
600///     }
601///     Ok(())
602/// }
603///
604/// async fn swarm_handle(
605///     mut swarm: Swarm,
606///     event: SwarmEvent,
607///     _state: SwarmState,
608/// ) -> anyhow::Result<()> {
609///     match &event {
610///         SwarmEvent::Disconnect(account, join_opts) => {
611///             // automatically reconnect after 5 seconds
612///             tokio::time::sleep(Duration::from_secs(5)).await;
613///             swarm.add_with_opts(account, State::default(), join_opts).await?;
614///         }
615///         SwarmEvent::Chat(m) => {
616///             println!("{}", m.message().to_ansi());
617///         }
618///         _ => {}
619///     }
620///     Ok(())
621/// }
622impl Swarm {
623    /// Add a new account to the swarm. You can remove it later by calling
624    /// [`Client::disconnect`].
625    ///
626    /// # Errors
627    ///
628    /// Returns an `Err` if the bot could not do a handshake successfully.
629    pub async fn add<S: Component + Clone>(
630        &mut self,
631        account: &Account,
632        state: S,
633    ) -> Result<Client, JoinError> {
634        self.add_with_opts(account, state, &JoinOpts::default())
635            .await
636    }
637    /// Add a new account to the swarm, using custom options. This is useful if
638    /// you want bots in the same swarm to connect to different addresses.
639    /// Usually you'll just want [`Self::add`] though.
640    ///
641    /// # Errors
642    ///
643    /// Returns an `Err` if the bot could not do a handshake successfully.
644    pub async fn add_with_opts<S: Component + Clone>(
645        &self,
646        account: &Account,
647        state: S,
648        join_opts: &JoinOpts,
649    ) -> Result<Client, JoinError> {
650        debug!("add_with_opts called for account {}", account.username);
651
652        let address = join_opts
653            .custom_address
654            .clone()
655            .unwrap_or_else(|| self.address.read().clone());
656        let resolved_address = join_opts
657            .custom_resolved_address
658            .unwrap_or_else(|| *self.resolved_address.read());
659
660        let (tx, rx) = mpsc::unbounded_channel();
661
662        let bot = Client::start_client(StartClientOpts {
663            ecs_lock: self.ecs_lock.clone(),
664            account,
665            address: &address,
666            resolved_address: &resolved_address,
667            proxy: join_opts.proxy.clone(),
668            run_schedule_sender: self.run_schedule_sender.clone(),
669            event_sender: Some(tx),
670        })
671        .await?;
672        // add the state to the client
673        {
674            let mut ecs = self.ecs_lock.lock();
675            ecs.entity_mut(bot.entity).insert(state);
676        }
677
678        self.bots.lock().insert(bot.entity, bot.clone());
679
680        let cloned_bots = self.bots.clone();
681        let cloned_bots_tx = self.bots_tx.clone();
682        let cloned_bot = bot.clone();
683        let swarm_tx = self.swarm_tx.clone();
684        let join_opts = join_opts.clone();
685        tokio::spawn(Self::event_copying_task(
686            rx,
687            cloned_bots,
688            cloned_bots_tx,
689            cloned_bot,
690            swarm_tx,
691            join_opts,
692        ));
693
694        Ok(bot)
695    }
696
697    async fn event_copying_task(
698        mut rx: mpsc::UnboundedReceiver<Event>,
699        cloned_bots: Arc<Mutex<HashMap<Entity, Client>>>,
700        cloned_bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
701        cloned_bot: Client,
702        swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
703        join_opts: JoinOpts,
704    ) {
705        while let Some(event) = rx.recv().await {
706            if rx.len() > 1_000 {
707                static WARNED_1_000: AtomicBool = AtomicBool::new(false);
708                if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
709                    warn!("the client's Event channel has more than 1000 items!")
710                }
711
712                if rx.len() > 10_000 {
713                    static WARNED_10_000: AtomicBool = AtomicBool::new(false);
714                    if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
715                        warn!("the client's Event channel has more than 10,000 items!!")
716                    }
717
718                    if rx.len() > 100_000 {
719                        static WARNED_100_000: AtomicBool = AtomicBool::new(false);
720                        if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
721                            warn!("the client's Event channel has more than 100,000 items!!!")
722                        }
723
724                        if rx.len() > 1_000_000 {
725                            static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
726                            if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
727                                warn!(
728                                    "the client's Event channel has more than 1,000,000 items!!!! i sincerely hope no one ever sees this warning"
729                                )
730                            }
731                        }
732                    }
733                }
734            }
735
736            // we can't handle events here (since we can't copy the handler),
737            // they're handled above in SwarmBuilder::start
738            if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
739                error!("Error sending event to swarm: {e}");
740            }
741        }
742        debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect");
743
744        cloned_bots.lock().remove(&cloned_bot.entity);
745        let account = cloned_bot
746            .get_component::<Account>()
747            .expect("bot is missing required Account component");
748        swarm_tx
749            .send(SwarmEvent::Disconnect(Box::new(account), join_opts))
750            .unwrap();
751    }
752
753    /// Add a new account to the swarm, retrying if it couldn't join. This will
754    /// run forever until the bot joins or the task is aborted.
755    ///
756    /// This does exponential backoff (though very limited), starting at 5
757    /// seconds and doubling up to 15 seconds.
758    pub async fn add_and_retry_forever<S: Component + Clone>(
759        &self,
760        account: &Account,
761        state: S,
762    ) -> Client {
763        self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
764            .await
765    }
766
767    /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
768    /// options.
769    pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
770        &self,
771        account: &Account,
772        state: S,
773        opts: &JoinOpts,
774    ) -> Client {
775        let mut disconnects = 0;
776        loop {
777            match self.add_with_opts(account, state.clone(), opts).await {
778                Ok(bot) => return bot,
779                Err(e) => {
780                    disconnects += 1;
781                    let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
782                        .min(Duration::from_secs(15));
783                    let username = account.username.clone();
784
785                    match &e {
786                        JoinError::Disconnect { reason } => {
787                            error!(
788                                "Error joining as {username}, server says: \"{reason}\". Waiting {delay:?} and trying again."
789                            );
790                        }
791                        _ => {
792                            error!(
793                                "Error joining as {username}: {e}. Waiting {delay:?} and trying again."
794                            );
795                        }
796                    }
797
798                    tokio::time::sleep(delay).await;
799                }
800            }
801        }
802    }
803}
804
805impl IntoIterator for Swarm {
806    type Item = Client;
807    type IntoIter = std::vec::IntoIter<Self::Item>;
808
809    /// Iterate over the bots in this swarm.
810    ///
811    /// ```rust,no_run
812    /// # use azalea::{prelude::*, swarm::prelude::*};
813    /// #[derive(Component, Clone)]
814    /// # pub struct State;
815    /// # fn example(swarm: Swarm) {
816    /// for bot in swarm {
817    ///     let state = bot.component::<State>();
818    ///     // ...
819    /// }
820    /// # }
821    /// ```
822    fn into_iter(self) -> Self::IntoIter {
823        self.bots
824            .lock()
825            .clone()
826            .into_values()
827            .collect::<Vec<_>>()
828            .into_iter()
829    }
830}
831
832/// This plugin group will add all the default plugins necessary for swarms to
833/// work.
834pub struct DefaultSwarmPlugins;
835
836impl PluginGroup for DefaultSwarmPlugins {
837    fn build(self) -> PluginGroupBuilder {
838        PluginGroupBuilder::start::<Self>()
839            .add(chat::SwarmChatPlugin)
840            .add(events::SwarmPlugin)
841    }
842}
843
844/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`]. You
845/// probably don't need to use this manually since the compiler will infer it
846/// for you.
847#[derive(Resource, Clone, Default)]
848pub struct NoSwarmState;