azalea/swarm/
mod.rs

1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10    collections::{HashMap, hash_map},
11    future::Future,
12    net::SocketAddr,
13    sync::{
14        Arc,
15        atomic::{self, AtomicBool},
16    },
17    time::Duration,
18};
19
20use azalea_client::{
21    Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket,
22    start_ecs_runner,
23};
24use azalea_protocol::{ServerAddress, resolver};
25use azalea_world::InstanceContainer;
26use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins};
27use bevy_ecs::{component::Component, entity::Entity, system::Resource, world::World};
28use futures::future::{BoxFuture, join_all};
29use parking_lot::{Mutex, RwLock};
30use tokio::sync::mpsc;
31use tracing::{debug, error, warn};
32
33use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
34
35/// A swarm is a way to conveniently control many bots at once, while also
36/// being able to control bots at an individual level when desired.
37///
38/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
39///
40/// Swarms are created from [`SwarmBuilder`].
41///
42/// Clients can be added to the swarm later via [`Swarm::add`], and can be
43/// removed with [`Client::disconnect`].
44#[derive(Clone, Resource)]
45pub struct Swarm {
46    pub ecs_lock: Arc<Mutex<World>>,
47
48    bots: Arc<Mutex<HashMap<Entity, Client>>>,
49
50    // the address is public and mutable so plugins can change it
51    pub resolved_address: Arc<RwLock<SocketAddr>>,
52    pub address: Arc<RwLock<ServerAddress>>,
53
54    pub instance_container: Arc<RwLock<InstanceContainer>>,
55
56    bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
57    swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
58}
59
60/// Create a new [`Swarm`].
61///
62/// The generics of this struct stand for the following:
63/// - S: State
64/// - SS: Swarm State
65/// - R: Return type of the handler
66/// - SR: Return type of the swarm handler
67///
68/// You shouldn't have to manually set them though, they'll be inferred for you.
69pub struct SwarmBuilder<S, SS, R, SR>
70where
71    S: Send + Sync + Clone + Component + 'static,
72    SS: Default + Send + Sync + Clone + Resource + 'static,
73{
74    pub(crate) app: App,
75    /// The accounts and proxies that are going to join the server.
76    pub(crate) accounts: Vec<(Account, JoinOpts)>,
77    /// The individual bot states. This must be the same length as `accounts`,
78    /// since each bot gets one state.
79    pub(crate) states: Vec<S>,
80    /// The state for the overall swarm.
81    pub(crate) swarm_state: SS,
82    /// The function that's called every time a bot receives an [`Event`].
83    pub(crate) handler: Option<BoxHandleFn<S, R>>,
84    /// The function that's called every time the swarm receives a
85    /// [`SwarmEvent`].
86    pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
87
88    /// How long we should wait between each bot joining the server. Set to
89    /// None to have every bot connect at the same time. None is different than
90    /// a duration of 0, since if a duration is present the bots will wait for
91    /// the previous one to be ready.
92    pub(crate) join_delay: Option<std::time::Duration>,
93}
94impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
95    /// Start creating the swarm.
96    #[must_use]
97    pub fn new() -> Self {
98        Self::new_without_plugins()
99            .add_plugins(DefaultPlugins)
100            .add_plugins(DefaultBotPlugins)
101            .add_plugins(DefaultSwarmPlugins)
102    }
103
104    /// [`Self::new`] but without adding the plugins by default. This is useful
105    /// if you want to disable a default plugin.
106    ///
107    /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
108    /// [`DefaultSwarmPlugins`] to this.
109    ///
110    /// ```
111    /// # use azalea::{prelude::*, swarm::prelude::*};
112    /// use azalea::{app::PluginGroup, DefaultBotPlugins, DefaultPlugins, swarm::{DefaultSwarmPlugins}};
113    /// use bevy_log::LogPlugin;
114    ///
115    /// let swarm_builder = SwarmBuilder::new_without_plugins()
116    ///     .add_plugins(DefaultPlugins.build().disable::<LogPlugin>())
117    ///     .add_plugins(DefaultBotPlugins)
118    ///     .add_plugins(DefaultSwarmPlugins);
119    /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
120    /// # #[derive(Component, Resource, Clone, Default)]
121    /// # pub struct State;
122    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
123    /// #     Ok(())
124    /// # }
125    /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
126    /// #     Ok(())
127    /// # }
128    /// ```
129    #[must_use]
130    pub fn new_without_plugins() -> Self {
131        SwarmBuilder {
132            // we create the app here so plugins can add onto it.
133            // the schedules won't run until [`Self::start`] is called.
134            app: App::new(),
135            accounts: Vec::new(),
136            states: Vec::new(),
137            swarm_state: NoSwarmState,
138            handler: None,
139            swarm_handler: None,
140            join_delay: None,
141        }
142    }
143}
144
145impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
146where
147    SS: Default + Send + Sync + Clone + Resource + 'static,
148{
149    /// Set the function that's called every time a bot receives an [`Event`].
150    /// This is the way to handle normal per-bot events.
151    ///
152    /// Currently you can have up to one handler.
153    ///
154    /// ```
155    /// # use azalea::{prelude::*, swarm::prelude::*};
156    /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
157    /// swarm_builder.set_handler(handle);
158    ///
159    /// #[derive(Component, Default, Clone)]
160    /// struct State {}
161    /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
162    ///     Ok(())
163    /// }
164    ///
165    /// # #[derive(Resource, Default, Clone)]
166    /// # struct SwarmState {}
167    /// # async fn swarm_handle(
168    /// #     mut swarm: Swarm,
169    /// #     event: SwarmEvent,
170    /// #     state: SwarmState,
171    /// # ) -> anyhow::Result<()> {
172    /// #     Ok(())
173    /// # }
174    /// ```
175    #[must_use]
176    pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
177    where
178        Fut: Future<Output = R> + Send + 'static,
179        S: Send + Sync + Clone + Component + Default + 'static,
180    {
181        SwarmBuilder {
182            handler: Some(Box::new(move |bot, event, state: S| {
183                Box::pin(handler(bot, event, state))
184            })),
185            // if we added accounts before the State was set, we've gotta set it to the default now
186            states: vec![S::default(); self.accounts.len()],
187            app: self.app,
188            ..self
189        }
190    }
191}
192
193impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
194where
195    S: Send + Sync + Clone + Component + 'static,
196{
197    /// Set the function that's called every time the swarm receives a
198    /// [`SwarmEvent`]. This is the way to handle global swarm events.
199    ///
200    /// Currently you can have up to one swarm handler.
201    ///
202    /// ```
203    /// # use azalea::{prelude::*, swarm::prelude::*};
204    /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
205    /// swarm_builder.set_swarm_handler(swarm_handle);
206    ///
207    /// # #[derive(Component, Default, Clone)]
208    /// # struct State {}
209    ///
210    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
211    /// #     Ok(())
212    /// # }
213    ///
214    /// #[derive(Resource, Default, Clone)]
215    /// struct SwarmState {}
216    /// async fn swarm_handle(
217    ///     mut swarm: Swarm,
218    ///     event: SwarmEvent,
219    ///     state: SwarmState,
220    /// ) -> anyhow::Result<()> {
221    ///     Ok(())
222    /// }
223    /// ```
224    #[must_use]
225    pub fn set_swarm_handler<SS, Fut, SR>(
226        self,
227        handler: SwarmHandleFn<SS, Fut>,
228    ) -> SwarmBuilder<S, SS, R, SR>
229    where
230        SS: Default + Send + Sync + Clone + Resource + 'static,
231        Fut: Future<Output = SR> + Send + 'static,
232    {
233        SwarmBuilder {
234            handler: self.handler,
235            app: self.app,
236            accounts: self.accounts,
237            states: self.states,
238            swarm_state: SS::default(),
239            swarm_handler: Some(Box::new(move |swarm, event, state| {
240                Box::pin(handler(swarm, event, state))
241            })),
242            join_delay: self.join_delay,
243        }
244    }
245}
246
247impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
248where
249    S: Send + Sync + Clone + Component + 'static,
250    SS: Default + Send + Sync + Clone + Resource + 'static,
251    R: Send + 'static,
252    SR: Send + 'static,
253{
254    /// Add a vec of [`Account`]s to the swarm.
255    ///
256    /// Use [`Self::add_account`] to only add one account. If you want the
257    /// clients to have different default states, add them one at a time with
258    /// [`Self::add_account_with_state`].
259    ///
260    /// By default, every account will join at the same time, you can add a
261    /// delay with [`Self::join_delay`].
262    #[must_use]
263    pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
264    where
265        S: Default,
266    {
267        for account in accounts {
268            self = self.add_account(account);
269        }
270
271        self
272    }
273
274    /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to
275    /// add multiple accounts at a time.
276    ///
277    /// This will make the state for this client be the default, use
278    /// [`Self::add_account_with_state`] to avoid that.
279    #[must_use]
280    pub fn add_account(self, account: Account) -> Self
281    where
282        S: Default,
283    {
284        self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
285    }
286
287    /// Add an account with a custom initial state. Use just
288    /// [`Self::add_account`] to use the Default implementation for the state.
289    #[must_use]
290    pub fn add_account_with_state(self, account: Account, state: S) -> Self {
291        self.add_account_with_state_and_opts(account, state, JoinOpts::default())
292    }
293
294    /// Add an account with a custom initial state. Use just
295    /// [`Self::add_account`] to use the Default implementation for the state.
296    #[must_use]
297    pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
298    where
299        S: Default,
300    {
301        self.add_account_with_state_and_opts(account, S::default(), opts)
302    }
303
304    /// Same as [`Self::add_account_with_state`], but allow passing in custom
305    /// join options.
306    #[must_use]
307    pub fn add_account_with_state_and_opts(
308        mut self,
309        account: Account,
310        state: S,
311        join_opts: JoinOpts,
312    ) -> Self {
313        self.accounts.push((account, join_opts));
314        self.states.push(state);
315        self
316    }
317
318    /// Set the swarm state instead of initializing defaults.
319    #[must_use]
320    pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
321        self.swarm_state = swarm_state;
322        self
323    }
324
325    /// Add one or more plugins to this swarm.
326    #[must_use]
327    pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
328        self.app.add_plugins(plugins);
329        self
330    }
331
332    /// Set how long we should wait between each bot joining the server.
333    ///
334    /// By default, every bot will connect at the same time. If you set this
335    /// field, however, the bots will wait for the previous one to have
336    /// connected and *then* they'll wait the given duration.
337    #[must_use]
338    pub fn join_delay(mut self, delay: std::time::Duration) -> Self {
339        self.join_delay = Some(delay);
340        self
341    }
342
343    /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
344    /// server.
345    ///
346    /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
347    /// that implements `TryInto<ServerAddress>`.
348    ///
349    /// [`ServerAddress`]: azalea_protocol::ServerAddress
350    pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<!, StartError> {
351        // convert the TryInto<ServerAddress> into a ServerAddress
352        let address: ServerAddress = match address.try_into() {
353            Ok(address) => address,
354            Err(_) => return Err(StartError::InvalidAddress),
355        };
356
357        self.start_with_default_opts(address, JoinOpts::default())
358            .await
359    }
360
361    /// Do the same as [`Self::start`], but allow passing in default join
362    /// options for the bots.
363    pub async fn start_with_default_opts(
364        self,
365        address: impl TryInto<ServerAddress>,
366        default_join_opts: JoinOpts,
367    ) -> Result<!, StartError> {
368        assert_eq!(
369            self.accounts.len(),
370            self.states.len(),
371            "There must be exactly one state per bot."
372        );
373
374        debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
375
376        // convert the TryInto<ServerAddress> into a ServerAddress
377        let address = match address.try_into() {
378            Ok(address) => address,
379            Err(_) => return Err(StartError::InvalidAddress),
380        };
381
382        let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
383        let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
384            a
385        } else {
386            resolver::resolve_address(&address).await?
387        };
388
389        let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
390
391        // we can't modify the swarm plugins after this
392        let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
393        let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
394
395        swarm_tx.send(SwarmEvent::Init).unwrap();
396
397        let main_schedule_label = self.app.main().update_schedule.unwrap();
398
399        let ecs_lock = start_ecs_runner(self.app);
400
401        let swarm = Swarm {
402            ecs_lock: ecs_lock.clone(),
403            bots: Arc::new(Mutex::new(HashMap::new())),
404
405            resolved_address: Arc::new(RwLock::new(resolved_address)),
406            address: Arc::new(RwLock::new(address)),
407            instance_container,
408
409            bots_tx,
410
411            swarm_tx: swarm_tx.clone(),
412        };
413
414        // run the main schedule so the startup systems run
415        {
416            let mut ecs = ecs_lock.lock();
417            ecs.insert_resource(swarm.clone());
418            ecs.insert_resource(self.swarm_state.clone());
419            ecs.run_schedule(main_schedule_label);
420            ecs.clear_trackers();
421        }
422
423        // SwarmBuilder (self) isn't Send so we have to take all the things we need out
424        // of it
425        let swarm_clone = swarm.clone();
426        let join_delay = self.join_delay;
427        let accounts = self.accounts.clone();
428        let states = self.states.clone();
429
430        tokio::spawn(async move {
431            if let Some(join_delay) = join_delay {
432                // if there's a join delay, then join one by one
433                for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
434                    let mut join_opts = default_join_opts.clone();
435                    join_opts.update(bot_join_opts);
436                    swarm_clone
437                        .add_and_retry_forever_with_opts(account, state, &join_opts)
438                        .await;
439                    tokio::time::sleep(join_delay).await;
440                }
441            } else {
442                // otherwise, join all at once
443                let swarm_borrow = &swarm_clone;
444                join_all(accounts.iter().zip(states).map(
445                    |((account, bot_join_opts), state)| async {
446                        let mut join_opts = default_join_opts.clone();
447                        join_opts.update(bot_join_opts);
448                        swarm_borrow
449                            .clone()
450                            .add_and_retry_forever_with_opts(account, state, &join_opts)
451                            .await;
452                    },
453                ))
454                .await;
455            }
456
457            swarm_tx.send(SwarmEvent::Login).unwrap();
458        });
459
460        let swarm_state = self.swarm_state;
461
462        // Watch swarm_rx and send those events to the swarm_handle.
463        let swarm_clone = swarm.clone();
464        tokio::spawn(async move {
465            while let Some(event) = swarm_rx.recv().await {
466                if let Some(swarm_handler) = &self.swarm_handler {
467                    tokio::spawn((swarm_handler)(
468                        swarm_clone.clone(),
469                        event,
470                        swarm_state.clone(),
471                    ));
472                }
473            }
474        });
475
476        // bot events
477        while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
478            if bots_rx.len() > 1_000 {
479                static WARNED: AtomicBool = AtomicBool::new(false);
480                if !WARNED.swap(true, atomic::Ordering::Relaxed) {
481                    warn!("the Client Event channel has more than 1000 items!")
482                }
483            }
484
485            if let Some(handler) = &self.handler {
486                let ecs_mutex = first_bot.ecs.clone();
487                let mut ecs = ecs_mutex.lock();
488                let mut query = ecs.query::<Option<&S>>();
489                let Ok(Some(first_bot_state)) = query.get(&mut ecs, first_bot.entity) else {
490                    error!(
491                        "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
492                        first_bot.username(),
493                        first_bot.entity
494                    );
495                    continue;
496                };
497                let first_bot_entity = first_bot.entity;
498                let first_bot_state = first_bot_state.clone();
499
500                tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
501
502                // this makes it not have to keep locking the ecs
503                let mut states = HashMap::new();
504                states.insert(first_bot_entity, first_bot_state);
505                while let Ok((Some(event), bot)) = bots_rx.try_recv() {
506                    let state = match states.entry(bot.entity) {
507                        hash_map::Entry::Occupied(e) => e.into_mut(),
508                        hash_map::Entry::Vacant(e) => {
509                            let Ok(Some(state)) = query.get(&mut ecs, bot.entity) else {
510                                error!(
511                                    "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
512                                    bot.username(),
513                                    bot.entity
514                                );
515                                continue;
516                            };
517                            let state = state.clone();
518                            e.insert(state)
519                        }
520                    };
521                    tokio::spawn((handler)(bot, event, state.clone()));
522                }
523            }
524        }
525
526        unreachable!(
527            "bots_rx.recv() should never be None because the bots_tx channel is never closed"
528        );
529    }
530}
531
532impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
533    fn default() -> Self {
534        Self::new()
535    }
536}
537
538/// An event about something that doesn't have to do with a single bot.
539#[derive(Clone, Debug)]
540#[non_exhaustive]
541pub enum SwarmEvent {
542    /// All the bots in the swarm have successfully joined the server.
543    Login,
544    /// The swarm was created. This is only fired once, and it's guaranteed to
545    /// be the first event to fire.
546    Init,
547    /// A bot got disconnected from the server.
548    ///
549    /// You can implement an auto-reconnect by calling [`Swarm::add_with_opts`]
550    /// with the account and options from this event.
551    Disconnect(Box<Account>, JoinOpts),
552    /// At least one bot received a chat message.
553    Chat(ChatPacket),
554}
555
556pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
557pub type BoxSwarmHandleFn<SS, R> =
558    Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send>;
559
560/// Make a bot [`Swarm`].
561///
562/// [`Swarm`]: struct.Swarm.html
563///
564/// # Examples
565/// ```rust,no_run
566/// use azalea::{prelude::*, swarm::prelude::*};
567/// use std::time::Duration;
568///
569/// #[derive(Default, Clone, Component)]
570/// struct State {}
571///
572/// #[derive(Default, Clone, Resource)]
573/// struct SwarmState {}
574///
575/// #[tokio::main]
576/// async fn main() {
577///     let mut accounts = Vec::new();
578///     let mut states = Vec::new();
579///
580///     for i in 0..10 {
581///         accounts.push(Account::offline(&format!("bot{i}")));
582///         states.push(State::default());
583///     }
584///
585///     SwarmBuilder::new()
586///         .add_accounts(accounts.clone())
587///         .set_handler(handle)
588///         .set_swarm_handler(swarm_handle)
589///         .join_delay(Duration::from_millis(1000))
590///         .start("localhost")
591///         .await
592///         .unwrap();
593/// }
594///
595/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
596///     match &event {
597///         _ => {}
598///     }
599///     Ok(())
600/// }
601///
602/// async fn swarm_handle(
603///     mut swarm: Swarm,
604///     event: SwarmEvent,
605///     _state: SwarmState,
606/// ) -> anyhow::Result<()> {
607///     match &event {
608///         SwarmEvent::Disconnect(account, join_opts) => {
609///             // automatically reconnect after 5 seconds
610///             tokio::time::sleep(Duration::from_secs(5)).await;
611///             swarm.add_with_opts(account, State::default(), join_opts).await?;
612///         }
613///         SwarmEvent::Chat(m) => {
614///             println!("{}", m.message().to_ansi());
615///         }
616///         _ => {}
617///     }
618///     Ok(())
619/// }
620impl Swarm {
621    /// Add a new account to the swarm. You can remove it later by calling
622    /// [`Client::disconnect`].
623    ///
624    /// # Errors
625    ///
626    /// Returns an `Err` if the bot could not do a handshake successfully.
627    pub async fn add<S: Component + Clone>(
628        &self,
629        account: &Account,
630        state: S,
631    ) -> Result<Client, JoinError> {
632        self.add_with_opts(account, state, &JoinOpts::default())
633            .await
634    }
635    /// Add a new account to the swarm, using custom options. This is useful if
636    /// you want bots in the same swarm to connect to different addresses.
637    /// Usually you'll just want [`Self::add`] though.
638    ///
639    /// # Errors
640    ///
641    /// Returns an `Err` if the bot could not do a handshake successfully.
642    pub async fn add_with_opts<S: Component + Clone>(
643        &self,
644        account: &Account,
645        state: S,
646        join_opts: &JoinOpts,
647    ) -> Result<Client, JoinError> {
648        debug!("add_with_opts called for account {}", account.username);
649
650        let address = join_opts
651            .custom_address
652            .clone()
653            .unwrap_or_else(|| self.address.read().clone());
654        let resolved_address = join_opts
655            .custom_resolved_address
656            .unwrap_or_else(|| *self.resolved_address.read());
657
658        let (tx, rx) = mpsc::unbounded_channel();
659
660        let bot = Client::start_client(StartClientOpts {
661            ecs_lock: self.ecs_lock.clone(),
662            account,
663            address: &address,
664            resolved_address: &resolved_address,
665            proxy: join_opts.proxy.clone(),
666            event_sender: Some(tx),
667        })
668        .await?;
669        // add the state to the client
670        {
671            let mut ecs = self.ecs_lock.lock();
672            ecs.entity_mut(bot.entity).insert(state);
673        }
674
675        self.bots.lock().insert(bot.entity, bot.clone());
676
677        let cloned_bots = self.bots.clone();
678        let cloned_bots_tx = self.bots_tx.clone();
679        let cloned_bot = bot.clone();
680        let swarm_tx = self.swarm_tx.clone();
681        let join_opts = join_opts.clone();
682        tokio::spawn(Self::event_copying_task(
683            rx,
684            cloned_bots,
685            cloned_bots_tx,
686            cloned_bot,
687            swarm_tx,
688            join_opts,
689        ));
690
691        Ok(bot)
692    }
693
694    async fn event_copying_task(
695        mut rx: mpsc::UnboundedReceiver<Event>,
696        cloned_bots: Arc<Mutex<HashMap<Entity, Client>>>,
697        cloned_bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
698        cloned_bot: Client,
699        swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
700        join_opts: JoinOpts,
701    ) {
702        while let Some(event) = rx.recv().await {
703            if rx.len() > 1_000 {
704                static WARNED_1_000: AtomicBool = AtomicBool::new(false);
705                if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
706                    warn!("the client's Event channel has more than 1000 items!")
707                }
708
709                if rx.len() > 10_000 {
710                    static WARNED_10_000: AtomicBool = AtomicBool::new(false);
711                    if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
712                        warn!("the client's Event channel has more than 10,000 items!!")
713                    }
714
715                    if rx.len() > 100_000 {
716                        static WARNED_100_000: AtomicBool = AtomicBool::new(false);
717                        if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
718                            warn!("the client's Event channel has more than 100,000 items!!!")
719                        }
720
721                        if rx.len() > 1_000_000 {
722                            static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
723                            if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
724                                warn!(
725                                    "the client's Event channel has more than 1,000,000 items!!!! i sincerely hope no one ever sees this warning"
726                                )
727                            }
728                        }
729                    }
730                }
731            }
732
733            // we can't handle events here (since we can't copy the handler),
734            // they're handled above in SwarmBuilder::start
735            if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
736                error!("Error sending event to swarm: {e}");
737            }
738        }
739        debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect");
740
741        cloned_bots.lock().remove(&cloned_bot.entity);
742        let account = cloned_bot
743            .get_component::<Account>()
744            .expect("bot is missing required Account component");
745        swarm_tx
746            .send(SwarmEvent::Disconnect(Box::new(account), join_opts))
747            .unwrap();
748    }
749
750    /// Add a new account to the swarm, retrying if it couldn't join. This will
751    /// run forever until the bot joins or the task is aborted.
752    ///
753    /// This does exponential backoff (though very limited), starting at 5
754    /// seconds and doubling up to 15 seconds.
755    pub async fn add_and_retry_forever<S: Component + Clone>(
756        &self,
757        account: &Account,
758        state: S,
759    ) -> Client {
760        self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
761            .await
762    }
763
764    /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
765    /// options.
766    pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
767        &self,
768        account: &Account,
769        state: S,
770        opts: &JoinOpts,
771    ) -> Client {
772        let mut disconnects = 0;
773        loop {
774            match self.add_with_opts(account, state.clone(), opts).await {
775                Ok(bot) => return bot,
776                Err(e) => {
777                    disconnects += 1;
778                    let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
779                        .min(Duration::from_secs(15));
780                    let username = account.username.clone();
781
782                    match &e {
783                        JoinError::Disconnect { reason } => {
784                            error!(
785                                "Error joining as {username}, server says: \"{reason}\". Waiting {delay:?} and trying again."
786                            );
787                        }
788                        _ => {
789                            error!(
790                                "Error joining as {username}: {e}. Waiting {delay:?} and trying again."
791                            );
792                        }
793                    }
794
795                    tokio::time::sleep(delay).await;
796                }
797            }
798        }
799    }
800}
801
802impl IntoIterator for Swarm {
803    type Item = Client;
804    type IntoIter = std::vec::IntoIter<Self::Item>;
805
806    /// Iterate over the bots in this swarm.
807    ///
808    /// ```rust,no_run
809    /// # use azalea::{prelude::*, swarm::prelude::*};
810    /// #[derive(Component, Clone)]
811    /// # pub struct State;
812    /// # fn example(swarm: Swarm) {
813    /// for bot in swarm {
814    ///     let state = bot.component::<State>();
815    ///     // ...
816    /// }
817    /// # }
818    /// ```
819    fn into_iter(self) -> Self::IntoIter {
820        self.bots
821            .lock()
822            .clone()
823            .into_values()
824            .collect::<Vec<_>>()
825            .into_iter()
826    }
827}
828
829/// This plugin group will add all the default plugins necessary for swarms to
830/// work.
831pub struct DefaultSwarmPlugins;
832
833impl PluginGroup for DefaultSwarmPlugins {
834    fn build(self) -> PluginGroupBuilder {
835        PluginGroupBuilder::start::<Self>()
836            .add(chat::SwarmChatPlugin)
837            .add(events::SwarmPlugin)
838    }
839}
840
841/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`]. You
842/// probably don't need to use this manually since the compiler will infer it
843/// for you.
844#[derive(Resource, Clone, Default)]
845pub struct NoSwarmState;