azalea/swarm/
mod.rs

1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10    collections::{HashMap, hash_map},
11    future::Future,
12    mem,
13    net::SocketAddr,
14    sync::{
15        Arc,
16        atomic::{self, AtomicBool},
17    },
18    time::Duration,
19};
20
21use azalea_client::{
22    Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts,
23    auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24    chat::ChatPacket,
25    join::ConnectOpts,
26    start_ecs_runner,
27};
28use azalea_entity::LocalEntity;
29use azalea_protocol::{ServerAddress, resolver};
30use azalea_world::InstanceContainer;
31use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
32use bevy_ecs::prelude::*;
33use futures::future::{BoxFuture, join_all};
34use parking_lot::{Mutex, RwLock};
35use tokio::sync::mpsc;
36use tracing::{debug, error, warn};
37
38use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
39
40/// A swarm is a way to conveniently control many bots at once, while also
41/// being able to control bots at an individual level when desired.
42///
43/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
44///
45/// Swarms are created from [`SwarmBuilder`].
46///
47/// Clients can be added to the swarm later via [`Swarm::add`], and can be
48/// removed with [`Client::disconnect`].
49#[derive(Clone, Resource)]
50pub struct Swarm {
51    pub ecs_lock: Arc<Mutex<World>>,
52
53    // the address is public and mutable so plugins can change it
54    pub resolved_address: Arc<RwLock<SocketAddr>>,
55    pub address: Arc<RwLock<ServerAddress>>,
56
57    pub instance_container: Arc<RwLock<InstanceContainer>>,
58
59    /// This is used internally to make the client handler function work.
60    bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
61    /// This is used internally to make the swarm handler function work.
62    swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
63}
64
65/// Create a new [`Swarm`].
66///
67/// The generics of this struct stand for the following:
68/// - S: State
69/// - SS: Swarm State
70/// - R: Return type of the handler
71/// - SR: Return type of the swarm handler
72///
73/// You shouldn't have to manually set them though, they'll be inferred for you.
74pub struct SwarmBuilder<S, SS, R, SR>
75where
76    S: Send + Sync + Clone + Component + 'static,
77    SS: Default + Send + Sync + Clone + Resource + 'static,
78    Self: Send,
79{
80    // SubApp is used instead of App to make it Send
81    pub(crate) app: SubApp,
82    /// The accounts and proxies that are going to join the server.
83    pub(crate) accounts: Vec<(Account, JoinOpts)>,
84    /// The individual bot states. This must be the same length as `accounts`,
85    /// since each bot gets one state.
86    pub(crate) states: Vec<S>,
87    /// The state for the overall swarm.
88    pub(crate) swarm_state: SS,
89    /// The function that's called every time a bot receives an [`Event`].
90    pub(crate) handler: Option<BoxHandleFn<S, R>>,
91    /// The function that's called every time the swarm receives a
92    /// [`SwarmEvent`].
93    pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
94
95    /// How long we should wait between each bot joining the server. Set to
96    /// None to have every bot connect at the same time. None is different than
97    /// a duration of 0, since if a duration is present the bots will wait for
98    /// the previous one to be ready.
99    pub(crate) join_delay: Option<Duration>,
100
101    /// The default reconnection delay for our bots. This will change the value
102    /// of the `AutoReconnectDelay` resource.
103    pub(crate) reconnect_after: Option<Duration>,
104}
105impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
106    /// Start creating the swarm.
107    #[must_use]
108    pub fn new() -> Self {
109        Self::new_without_plugins()
110            .add_plugins(DefaultPlugins)
111            .add_plugins(DefaultBotPlugins)
112            .add_plugins(DefaultSwarmPlugins)
113    }
114
115    /// [`Self::new`] but without adding the plugins by default.
116    ///
117    /// This is useful if you want to disable a default plugin. This also exists
118    /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
119    ///
120    /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
121    /// [`DefaultSwarmPlugins`] to this.
122    ///
123    /// ```
124    /// # use azalea::{prelude::*, swarm::prelude::*};
125    /// use azalea::app::PluginGroup;
126    ///
127    /// let swarm_builder = SwarmBuilder::new_without_plugins()
128    ///     .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>())
129    ///     .add_plugins(azalea::DefaultBotPlugins)
130    ///     .add_plugins(azalea::swarm::DefaultSwarmPlugins);
131    /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
132    /// # #[derive(Component, Resource, Clone, Default)]
133    /// # pub struct State;
134    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
135    /// #     Ok(())
136    /// # }
137    /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
138    /// #     Ok(())
139    /// # }
140    /// ```
141    ///
142    /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
143    #[must_use]
144    pub fn new_without_plugins() -> Self {
145        SwarmBuilder {
146            // we create the app here so plugins can add onto it.
147            // the schedules won't run until [`Self::start`] is called.
148
149            // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
150            // initialized
151            app: mem::take(App::new().main_mut()),
152            accounts: Vec::new(),
153            states: Vec::new(),
154            swarm_state: NoSwarmState,
155            handler: None,
156            swarm_handler: None,
157            join_delay: None,
158            reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
159        }
160    }
161}
162
163impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
164where
165    SS: Default + Send + Sync + Clone + Resource + 'static,
166{
167    /// Set the function that's called every time a bot receives an
168    /// [`enum@Event`]. This is the way to handle normal per-bot events.
169    ///
170    /// Currently you can have up to one handler.
171    ///
172    /// Note that if you're creating clients directly from the ECS using
173    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
174    /// the handler function won't be called for that client. This also applies
175    /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
176    /// most bots, though.
177    ///
178    /// ```
179    /// # use azalea::{prelude::*, swarm::prelude::*};
180    /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
181    /// swarm_builder.set_handler(handle);
182    ///
183    /// #[derive(Component, Default, Clone)]
184    /// struct State {}
185    /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
186    ///     Ok(())
187    /// }
188    ///
189    /// # #[derive(Resource, Default, Clone)]
190    /// # struct SwarmState {}
191    /// # async fn swarm_handle(
192    /// #     mut swarm: Swarm,
193    /// #     event: SwarmEvent,
194    /// #     state: SwarmState,
195    /// # ) -> anyhow::Result<()> {
196    /// #     Ok(())
197    /// # }
198    /// ```
199    ///
200    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
201    #[must_use]
202    pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
203    where
204        Fut: Future<Output = R> + Send + 'static,
205        S: Send + Sync + Clone + Component + Default + 'static,
206    {
207        SwarmBuilder {
208            handler: Some(Box::new(move |bot, event, state: S| {
209                Box::pin(handler(bot, event, state))
210            })),
211            // if we added accounts before the State was set, we've gotta set it to the default now
212            states: vec![S::default(); self.accounts.len()],
213            app: self.app,
214            ..self
215        }
216    }
217}
218
219impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
220where
221    S: Send + Sync + Clone + Component + 'static,
222{
223    /// Set the function that's called every time the swarm receives a
224    /// [`SwarmEvent`]. This is the way to handle global swarm events.
225    ///
226    /// Currently you can have up to one swarm handler.
227    ///
228    /// Note that if you're creating clients directly from the ECS using
229    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
230    /// this handler function won't be called for that client. This also applies
231    /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
232    /// most bots, though.
233    ///
234    /// ```
235    /// # use azalea::{prelude::*, swarm::prelude::*};
236    /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
237    /// swarm_builder.set_swarm_handler(swarm_handle);
238    ///
239    /// # #[derive(Component, Default, Clone)]
240    /// # struct State {}
241    ///
242    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
243    /// #     Ok(())
244    /// # }
245    ///
246    /// #[derive(Resource, Default, Clone)]
247    /// struct SwarmState {}
248    /// async fn swarm_handle(
249    ///     mut swarm: Swarm,
250    ///     event: SwarmEvent,
251    ///     state: SwarmState,
252    /// ) -> anyhow::Result<()> {
253    ///     Ok(())
254    /// }
255    /// ```
256    ///
257    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
258    #[must_use]
259    pub fn set_swarm_handler<SS, Fut, SR>(
260        self,
261        handler: SwarmHandleFn<SS, Fut>,
262    ) -> SwarmBuilder<S, SS, R, SR>
263    where
264        SS: Default + Send + Sync + Clone + Resource + 'static,
265        Fut: Future<Output = SR> + Send + 'static,
266    {
267        SwarmBuilder {
268            handler: self.handler,
269            app: self.app,
270            accounts: self.accounts,
271            states: self.states,
272            swarm_state: SS::default(),
273            swarm_handler: Some(Box::new(move |swarm, event, state| {
274                Box::pin(handler(swarm, event, state))
275            })),
276            join_delay: self.join_delay,
277            reconnect_after: self.reconnect_after,
278        }
279    }
280}
281
282impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
283where
284    S: Send + Sync + Clone + Component + 'static,
285    SS: Default + Send + Sync + Clone + Resource + 'static,
286    R: Send + 'static,
287    SR: Send + 'static,
288{
289    /// Add a vec of [`Account`]s to the swarm.
290    ///
291    /// Use [`Self::add_account`] to only add one account. If you want the
292    /// clients to have different default states, add them one at a time with
293    /// [`Self::add_account_with_state`].
294    ///
295    /// By default, every account will join at the same time, you can add a
296    /// delay with [`Self::join_delay`].
297    #[must_use]
298    pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
299    where
300        S: Default,
301    {
302        for account in accounts {
303            self = self.add_account(account);
304        }
305
306        self
307    }
308
309    /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to
310    /// add multiple accounts at a time.
311    ///
312    /// This will make the state for this client be the default, use
313    /// [`Self::add_account_with_state`] to avoid that.
314    #[must_use]
315    pub fn add_account(self, account: Account) -> Self
316    where
317        S: Default,
318    {
319        self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
320    }
321
322    /// Add an account with a custom initial state. Use just
323    /// [`Self::add_account`] to use the Default implementation for the state.
324    #[must_use]
325    pub fn add_account_with_state(self, account: Account, state: S) -> Self {
326        self.add_account_with_state_and_opts(account, state, JoinOpts::default())
327    }
328
329    /// Add an account with a custom initial state. Use just
330    /// [`Self::add_account`] to use the Default implementation for the state.
331    #[must_use]
332    pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
333    where
334        S: Default,
335    {
336        self.add_account_with_state_and_opts(account, S::default(), opts)
337    }
338
339    /// Same as [`Self::add_account_with_state`], but allow passing in custom
340    /// join options.
341    #[must_use]
342    pub fn add_account_with_state_and_opts(
343        mut self,
344        account: Account,
345        state: S,
346        join_opts: JoinOpts,
347    ) -> Self {
348        self.accounts.push((account, join_opts));
349        self.states.push(state);
350        self
351    }
352
353    /// Set the swarm state instead of initializing defaults.
354    #[must_use]
355    pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
356        self.swarm_state = swarm_state;
357        self
358    }
359
360    /// Add one or more plugins to this swarm.
361    ///
362    /// See [`Self::new_without_plugins`] to learn how to disable default
363    /// plugins.
364    #[must_use]
365    pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
366        self.app.add_plugins(plugins);
367        self
368    }
369
370    /// Set how long we should wait between each bot joining the server.
371    ///
372    /// By default, every bot will connect at the same time. If you set this
373    /// field, however, the bots will wait for the previous one to have
374    /// connected and *then* they'll wait the given duration.
375    #[must_use]
376    pub fn join_delay(mut self, delay: Duration) -> Self {
377        self.join_delay = Some(delay);
378        self
379    }
380
381    /// Configures the auto-reconnection behavior for our bots.
382    ///
383    /// If this is `Some`, then it'll set the default reconnection delay for our
384    /// bots (how long they'll wait after being kicked before they try
385    /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
386    ///
387    /// If this function isn't called, then our clients will reconnect after
388    /// [`DEFAULT_RECONNECT_DELAY`].
389    #[must_use]
390    pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
391        self.reconnect_after = delay.into();
392        self
393    }
394
395    /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
396    /// server.
397    ///
398    /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
399    /// that implements `TryInto<ServerAddress>`.
400    ///
401    /// [`ServerAddress`]: azalea_protocol::ServerAddress
402    pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<!, StartError> {
403        // convert the TryInto<ServerAddress> into a ServerAddress
404        let address: ServerAddress = match address.try_into() {
405            Ok(address) => address,
406            Err(_) => return Err(StartError::InvalidAddress),
407        };
408
409        self.start_with_default_opts(address, JoinOpts::default())
410            .await
411    }
412
413    /// Do the same as [`Self::start`], but allow passing in default join
414    /// options for the bots.
415    pub async fn start_with_default_opts(
416        mut self,
417        address: impl TryInto<ServerAddress>,
418        default_join_opts: JoinOpts,
419    ) -> Result<!, StartError> {
420        assert_eq!(
421            self.accounts.len(),
422            self.states.len(),
423            "There must be exactly one state per bot."
424        );
425
426        debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
427
428        // convert the TryInto<ServerAddress> into a ServerAddress
429        let address = match address.try_into() {
430            Ok(address) => address,
431            Err(_) => return Err(StartError::InvalidAddress),
432        };
433
434        let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
435        let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
436            a
437        } else {
438            resolver::resolve_address(&address).await?
439        };
440
441        let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
442
443        // we can't modify the swarm plugins after this
444        let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
445        let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
446
447        swarm_tx.send(SwarmEvent::Init).unwrap();
448
449        let main_schedule_label = self.app.update_schedule.unwrap();
450
451        let (ecs_lock, start_running_systems) = start_ecs_runner(&mut self.app);
452
453        let swarm = Swarm {
454            ecs_lock: ecs_lock.clone(),
455
456            resolved_address: Arc::new(RwLock::new(resolved_address)),
457            address: Arc::new(RwLock::new(address)),
458            instance_container,
459
460            bots_tx,
461
462            swarm_tx: swarm_tx.clone(),
463        };
464
465        // run the main schedule so the startup systems run
466        {
467            let mut ecs = ecs_lock.lock();
468            ecs.insert_resource(swarm.clone());
469            ecs.insert_resource(self.swarm_state.clone());
470            if let Some(reconnect_after) = self.reconnect_after {
471                ecs.insert_resource(AutoReconnectDelay {
472                    delay: reconnect_after,
473                });
474            } else {
475                ecs.remove_resource::<AutoReconnectDelay>();
476            }
477            ecs.run_schedule(main_schedule_label);
478            ecs.clear_trackers();
479        }
480
481        // only do this after we inserted the Swarm and state resources to avoid errors
482        // where Res<Swarm> is inaccessible
483        start_running_systems();
484
485        // SwarmBuilder (self) isn't Send so we have to take all the things we need out
486        // of it
487        let swarm_clone = swarm.clone();
488        let join_delay = self.join_delay;
489        let accounts = self.accounts.clone();
490        let states = self.states.clone();
491
492        tokio::spawn(async move {
493            if let Some(join_delay) = join_delay {
494                // if there's a join delay, then join one by one
495                for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
496                    let mut join_opts = default_join_opts.clone();
497                    join_opts.update(bot_join_opts);
498                    swarm_clone
499                        .add_and_retry_forever_with_opts(account, state, &join_opts)
500                        .await;
501                    tokio::time::sleep(join_delay).await;
502                }
503            } else {
504                // otherwise, join all at once
505                let swarm_borrow = &swarm_clone;
506                join_all(accounts.iter().zip(states).map(
507                    |((account, bot_join_opts), state)| async {
508                        let mut join_opts = default_join_opts.clone();
509                        join_opts.update(bot_join_opts);
510                        swarm_borrow
511                            .clone()
512                            .add_and_retry_forever_with_opts(account, state, &join_opts)
513                            .await;
514                    },
515                ))
516                .await;
517            }
518
519            swarm_tx.send(SwarmEvent::Login).unwrap();
520        });
521
522        let swarm_state = self.swarm_state;
523
524        // Watch swarm_rx and send those events to the swarm_handle.
525        let swarm_clone = swarm.clone();
526        tokio::spawn(async move {
527            while let Some(event) = swarm_rx.recv().await {
528                if let Some(swarm_handler) = &self.swarm_handler {
529                    tokio::spawn((swarm_handler)(
530                        swarm_clone.clone(),
531                        event,
532                        swarm_state.clone(),
533                    ));
534                }
535            }
536        });
537
538        // bot events
539        while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
540            if bots_rx.len() > 1_000 {
541                static WARNED: AtomicBool = AtomicBool::new(false);
542                if !WARNED.swap(true, atomic::Ordering::Relaxed) {
543                    warn!("the Client Event channel has more than 1000 items!")
544                }
545            }
546
547            if let Some(handler) = &self.handler {
548                let ecs_mutex = first_bot.ecs.clone();
549                let mut ecs = ecs_mutex.lock();
550                let mut query = ecs.query::<Option<&S>>();
551                let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
552                    error!(
553                        "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
554                        first_bot.username(),
555                        first_bot.entity
556                    );
557                    continue;
558                };
559                let first_bot_entity = first_bot.entity;
560                let first_bot_state = first_bot_state.clone();
561
562                tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
563
564                // this makes it not have to keep locking the ecs
565                let mut states = HashMap::new();
566                states.insert(first_bot_entity, first_bot_state);
567                while let Ok((Some(event), bot)) = bots_rx.try_recv() {
568                    let state = match states.entry(bot.entity) {
569                        hash_map::Entry::Occupied(e) => e.into_mut(),
570                        hash_map::Entry::Vacant(e) => {
571                            let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
572                                error!(
573                                    "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
574                                    bot.username(),
575                                    bot.entity
576                                );
577                                continue;
578                            };
579                            let state = state.clone();
580                            e.insert(state)
581                        }
582                    };
583                    tokio::spawn((handler)(bot, event, state.clone()));
584                }
585            }
586        }
587
588        unreachable!(
589            "bots_rx.recv() should never be None because the bots_tx channel is never closed"
590        );
591    }
592}
593
594impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
595    fn default() -> Self {
596        Self::new()
597    }
598}
599
600/// An event about something that doesn't have to do with a single bot.
601#[derive(Clone, Debug)]
602#[non_exhaustive]
603pub enum SwarmEvent {
604    /// All the bots in the swarm have successfully joined the server.
605    Login,
606    /// The swarm was created. This is only fired once, and it's guaranteed to
607    /// be the first event to fire.
608    Init,
609    /// A bot got disconnected from the server.
610    ///
611    /// If you'd like to implement special auto-reconnect behavior beyond what's
612    /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
613    /// and then call [`Swarm::add_with_opts`] with the account and options
614    /// from this event.
615    ///
616    /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
617    Disconnect(Box<Account>, JoinOpts),
618    /// At least one bot received a chat message.
619    Chat(ChatPacket),
620}
621
622pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
623pub type BoxSwarmHandleFn<SS, R> =
624    Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
625
626/// Make a bot [`Swarm`].
627///
628/// [`Swarm`]: struct.Swarm.html
629///
630/// # Examples
631/// ```rust,no_run
632/// use azalea::{prelude::*, swarm::prelude::*};
633/// use std::time::Duration;
634///
635/// #[derive(Default, Clone, Component)]
636/// struct State {}
637///
638/// #[derive(Default, Clone, Resource)]
639/// struct SwarmState {}
640///
641/// #[tokio::main]
642/// async fn main() {
643///     let mut accounts = Vec::new();
644///     let mut states = Vec::new();
645///
646///     for i in 0..10 {
647///         accounts.push(Account::offline(&format!("bot{i}")));
648///         states.push(State::default());
649///     }
650///
651///     SwarmBuilder::new()
652///         .add_accounts(accounts.clone())
653///         .set_handler(handle)
654///         .set_swarm_handler(swarm_handle)
655///         .join_delay(Duration::from_millis(1000))
656///         .start("localhost")
657///         .await
658///         .unwrap();
659/// }
660///
661/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
662///     match &event {
663///         _ => {}
664///     }
665///     Ok(())
666/// }
667///
668/// async fn swarm_handle(
669///     mut swarm: Swarm,
670///     event: SwarmEvent,
671///     _state: SwarmState,
672/// ) -> anyhow::Result<()> {
673///     match &event {
674///         SwarmEvent::Disconnect(account, join_opts) => {
675///             // automatically reconnect after 5 seconds
676///             tokio::time::sleep(Duration::from_secs(5)).await;
677///             swarm.add_with_opts(account, State::default(), join_opts).await?;
678///         }
679///         SwarmEvent::Chat(m) => {
680///             println!("{}", m.message().to_ansi());
681///         }
682///         _ => {}
683///     }
684///     Ok(())
685/// }
686impl Swarm {
687    /// Add a new account to the swarm. You can remove it later by calling
688    /// [`Client::disconnect`].
689    ///
690    /// # Errors
691    ///
692    /// Returns an `Err` if the bot could not do a handshake successfully.
693    pub async fn add<S: Component + Clone>(
694        &self,
695        account: &Account,
696        state: S,
697    ) -> Result<Client, JoinError> {
698        self.add_with_opts(account, state, &JoinOpts::default())
699            .await
700    }
701    /// Add a new account to the swarm, using custom options. This is useful if
702    /// you want bots in the same swarm to connect to different addresses.
703    /// Usually you'll just want [`Self::add`] though.
704    ///
705    /// # Errors
706    ///
707    /// Returns an `Err` if the bot could not do a handshake successfully.
708    pub async fn add_with_opts<S: Component + Clone>(
709        &self,
710        account: &Account,
711        state: S,
712        join_opts: &JoinOpts,
713    ) -> Result<Client, JoinError> {
714        debug!(
715            "add_with_opts called for account {} with opts {join_opts:?}",
716            account.username
717        );
718
719        let address = join_opts
720            .custom_address
721            .clone()
722            .unwrap_or_else(|| self.address.read().clone());
723        let resolved_address = join_opts
724            .custom_resolved_address
725            .unwrap_or_else(|| *self.resolved_address.read());
726        let proxy = join_opts.proxy.clone();
727
728        let (tx, rx) = mpsc::unbounded_channel();
729
730        let bot = Client::start_client(StartClientOpts {
731            ecs_lock: self.ecs_lock.clone(),
732            account: account.clone(),
733            connect_opts: ConnectOpts {
734                address,
735                resolved_address,
736                proxy,
737            },
738            event_sender: Some(tx),
739        })
740        .await?;
741        // add the state to the client
742        {
743            let mut ecs = self.ecs_lock.lock();
744            ecs.entity_mut(bot.entity).insert(state);
745        }
746
747        let cloned_bot = bot.clone();
748        let swarm_tx = self.swarm_tx.clone();
749        let bots_tx = self.bots_tx.clone();
750
751        let join_opts = join_opts.clone();
752        tokio::spawn(Self::event_copying_task(
753            rx, swarm_tx, bots_tx, cloned_bot, join_opts,
754        ));
755
756        Ok(bot)
757    }
758
759    /// Copy the events from a client's receiver into bots_tx, until the bot is
760    /// removed from the ECS.
761    async fn event_copying_task(
762        mut rx: mpsc::UnboundedReceiver<Event>,
763        swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
764        bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
765        bot: Client,
766        join_opts: JoinOpts,
767    ) {
768        while let Some(event) = rx.recv().await {
769            if rx.len() > 1_000 {
770                static WARNED_1_000: AtomicBool = AtomicBool::new(false);
771                if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
772                    warn!("the client's Event channel has more than 1000 items!")
773                }
774
775                if rx.len() > 10_000 {
776                    static WARNED_10_000: AtomicBool = AtomicBool::new(false);
777                    if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
778                        warn!("the client's Event channel has more than 10,000 items!!")
779                    }
780
781                    if rx.len() > 100_000 {
782                        static WARNED_100_000: AtomicBool = AtomicBool::new(false);
783                        if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
784                            warn!("the client's Event channel has more than 100,000 items!!!")
785                        }
786
787                        if rx.len() > 1_000_000 {
788                            static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
789                            if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
790                                warn!(
791                                    "the client's Event channel has more than 1,000,000 items!!!! i sincerely hope no one ever sees this warning"
792                                )
793                            }
794                        }
795                    }
796                }
797            }
798
799            if let Event::Disconnect(_) = event {
800                debug!(
801                    "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
802                    bot.entity
803                );
804                let account = bot
805                    .get_component::<Account>()
806                    .expect("bot is missing required Account component");
807                swarm_tx
808                    .send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
809                    .unwrap();
810            }
811
812            // we can't handle events here (since we can't copy the handler),
813            // they're handled above in SwarmBuilder::start
814            if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
815                error!(
816                    "Error sending event to swarm, aborting event_copying_task for {}: {e}",
817                    bot.entity
818                );
819                break;
820            }
821        }
822        debug!(
823            "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
824            bot.entity
825        );
826    }
827
828    /// Add a new account to the swarm, retrying if it couldn't join. This will
829    /// run forever until the bot joins or the task is aborted.
830    ///
831    /// This does exponential backoff (though very limited), starting at 5
832    /// seconds and doubling up to 15 seconds.
833    pub async fn add_and_retry_forever<S: Component + Clone>(
834        &self,
835        account: &Account,
836        state: S,
837    ) -> Client {
838        self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
839            .await
840    }
841
842    /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
843    /// options.
844    pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
845        &self,
846        account: &Account,
847        state: S,
848        opts: &JoinOpts,
849    ) -> Client {
850        let mut disconnects = 0;
851        loop {
852            match self.add_with_opts(account, state.clone(), opts).await {
853                Ok(bot) => return bot,
854                Err(e) => {
855                    disconnects += 1;
856                    let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
857                        .min(Duration::from_secs(15));
858                    let username = account.username.clone();
859
860                    match &e {
861                        JoinError::Disconnect { reason } => {
862                            error!(
863                                "Error joining as {username}, server says: \"{reason}\". Waiting {delay:?} and trying again."
864                            );
865                        }
866                        _ => {
867                            error!(
868                                "Error joining as {username}: {e}. Waiting {delay:?} and trying again."
869                            );
870                        }
871                    }
872
873                    tokio::time::sleep(delay).await;
874                }
875            }
876        }
877    }
878
879    /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
880    /// This will include clients that were disconnected without being removed
881    /// from the ECS.
882    ///
883    /// [`LocalEntity`]: azalea_entity::LocalEntity
884    pub fn client_entities(&self) -> Box<[Entity]> {
885        let mut ecs = self.ecs_lock.lock();
886        let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
887        query.iter(&ecs).collect::<Box<[Entity]>>()
888    }
889}
890
891impl IntoIterator for Swarm {
892    type Item = Client;
893    type IntoIter = std::vec::IntoIter<Self::Item>;
894
895    /// Iterate over the bots in this swarm.
896    ///
897    /// ```rust,no_run
898    /// # use azalea::{prelude::*, swarm::prelude::*};
899    /// #[derive(Component, Clone)]
900    /// # pub struct State;
901    /// # fn example(swarm: Swarm) {
902    /// for bot in swarm {
903    ///     let state = bot.component::<State>();
904    ///     // ...
905    /// }
906    /// # }
907    /// ```
908    fn into_iter(self) -> Self::IntoIter {
909        let client_entities = self.client_entities();
910
911        client_entities
912            .into_iter()
913            .map(|entity| Client::new(entity, self.ecs_lock.clone()))
914            .collect::<Box<[Client]>>()
915            .into_iter()
916    }
917}
918
919/// This plugin group will add all the default plugins necessary for swarms to
920/// work.
921pub struct DefaultSwarmPlugins;
922
923impl PluginGroup for DefaultSwarmPlugins {
924    fn build(self) -> PluginGroupBuilder {
925        PluginGroupBuilder::start::<Self>()
926            .add(chat::SwarmChatPlugin)
927            .add(events::SwarmPlugin)
928    }
929}
930
931/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`]. You
932/// probably don't need to use this manually since the compiler will infer it
933/// for you.
934#[derive(Resource, Clone, Default)]
935pub struct NoSwarmState;