azalea/swarm/
mod.rs

1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10    collections::{HashMap, hash_map},
11    future::Future,
12    mem,
13    net::SocketAddr,
14    sync::{
15        Arc,
16        atomic::{self, AtomicBool},
17    },
18    time::Duration,
19};
20
21use azalea_client::{
22    Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts,
23    auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24    chat::ChatPacket,
25    join::ConnectOpts,
26    start_ecs_runner,
27};
28use azalea_entity::LocalEntity;
29use azalea_protocol::{ServerAddress, resolver};
30use azalea_world::InstanceContainer;
31use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
32use bevy_ecs::prelude::*;
33use futures::future::{BoxFuture, join_all};
34use parking_lot::{Mutex, RwLock};
35use tokio::{sync::mpsc, time::sleep};
36use tracing::{debug, error, warn};
37
38use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
39
40/// A swarm is a way to conveniently control many bots at once, while also
41/// being able to control bots at an individual level when desired.
42///
43/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
44///
45/// Swarms are created from [`SwarmBuilder`].
46///
47/// Clients can be added to the swarm later via [`Swarm::add`], and can be
48/// removed with [`Client::disconnect`].
49#[derive(Clone, Resource)]
50pub struct Swarm {
51    pub ecs_lock: Arc<Mutex<World>>,
52
53    // the address is public and mutable so plugins can change it
54    pub resolved_address: Arc<RwLock<SocketAddr>>,
55    pub address: Arc<RwLock<ServerAddress>>,
56
57    pub instance_container: Arc<RwLock<InstanceContainer>>,
58
59    /// This is used internally to make the client handler function work.
60    bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
61    /// This is used internally to make the swarm handler function work.
62    swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
63}
64
65/// Create a new [`Swarm`].
66///
67/// The generics of this struct stand for the following:
68/// - S: State
69/// - SS: Swarm State
70/// - R: Return type of the handler
71/// - SR: Return type of the swarm handler
72///
73/// You shouldn't have to manually set them though, they'll be inferred for you.
74pub struct SwarmBuilder<S, SS, R, SR>
75where
76    S: Send + Sync + Clone + Component + 'static,
77    SS: Default + Send + Sync + Clone + Resource + 'static,
78    Self: Send,
79{
80    // SubApp is used instead of App to make it Send
81    pub(crate) app: SubApp,
82    /// The accounts and proxies that are going to join the server.
83    pub(crate) accounts: Vec<(Account, JoinOpts)>,
84    /// The individual bot states.
85    ///
86    /// This must be the same length as `accounts`, since each bot gets one
87    /// state.
88    pub(crate) states: Vec<S>,
89    /// The state for the overall swarm.
90    pub(crate) swarm_state: SS,
91    /// The function that's called every time a bot receives an [`Event`].
92    pub(crate) handler: Option<BoxHandleFn<S, R>>,
93    /// The function that's called every time the swarm receives a
94    /// [`SwarmEvent`].
95    pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
96
97    /// How long we should wait between each bot joining the server.
98    ///
99    /// If this is None, every bot will connect at the same time. None is
100    /// different than a duration of 0, since if a duration is present the
101    /// bots will wait for the previous one to be ready.
102    pub(crate) join_delay: Option<Duration>,
103
104    /// The default reconnection delay for our bots.
105    ///
106    /// This will change the value of the [`AutoReconnectDelay`] resource.
107    pub(crate) reconnect_after: Option<Duration>,
108}
109impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
110    /// Start creating the swarm.
111    #[must_use]
112    pub fn new() -> Self {
113        Self::new_without_plugins()
114            .add_plugins(DefaultPlugins)
115            .add_plugins(DefaultBotPlugins)
116            .add_plugins(DefaultSwarmPlugins)
117    }
118
119    /// [`Self::new`] but without adding the plugins by default.
120    ///
121    /// This is useful if you want to disable a default plugin. This also exists
122    /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
123    ///
124    /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
125    /// [`DefaultSwarmPlugins`] to this.
126    ///
127    /// ```
128    /// # use azalea::{prelude::*, swarm::prelude::*};
129    /// use azalea::app::PluginGroup;
130    ///
131    /// let swarm_builder = SwarmBuilder::new_without_plugins()
132    ///     .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>())
133    ///     .add_plugins(azalea::bot::DefaultBotPlugins)
134    ///     .add_plugins(azalea::swarm::DefaultSwarmPlugins);
135    /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
136    /// # #[derive(Component, Resource, Clone, Default)]
137    /// # pub struct State;
138    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
139    /// #     Ok(())
140    /// # }
141    /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
142    /// #     Ok(())
143    /// # }
144    /// ```
145    ///
146    /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
147    #[must_use]
148    pub fn new_without_plugins() -> Self {
149        SwarmBuilder {
150            // we create the app here so plugins can add onto it.
151            // the schedules won't run until [`Self::start`] is called.
152
153            // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
154            // initialized
155            app: mem::take(App::new().main_mut()),
156            accounts: Vec::new(),
157            states: Vec::new(),
158            swarm_state: NoSwarmState,
159            handler: None,
160            swarm_handler: None,
161            join_delay: None,
162            reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
163        }
164    }
165}
166
167impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
168where
169    SS: Default + Send + Sync + Clone + Resource + 'static,
170{
171    /// Set the function that's called every time a bot receives an
172    /// [`enum@Event`]. This is the intended way to handle normal per-bot
173    /// events.
174    ///
175    /// Currently you can have up to one handler.
176    ///
177    /// Note that if you're creating clients directly from the ECS using
178    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
179    /// the handler function won't be called for that client. This also applies
180    /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
181    /// most bots, though.
182    ///
183    /// ```
184    /// # use azalea::{prelude::*, swarm::prelude::*};
185    /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
186    /// swarm_builder.set_handler(handle);
187    ///
188    /// #[derive(Component, Default, Clone)]
189    /// struct State {}
190    /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
191    ///     Ok(())
192    /// }
193    ///
194    /// # #[derive(Resource, Default, Clone)]
195    /// # struct SwarmState {}
196    /// # async fn swarm_handle(
197    /// #     mut swarm: Swarm,
198    /// #     event: SwarmEvent,
199    /// #     state: SwarmState,
200    /// # ) -> anyhow::Result<()> {
201    /// #     Ok(())
202    /// # }
203    /// ```
204    ///
205    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
206    #[must_use]
207    pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
208    where
209        Fut: Future<Output = R> + Send + 'static,
210        S: Send + Sync + Clone + Component + Default + 'static,
211    {
212        SwarmBuilder {
213            handler: Some(Box::new(move |bot, event, state: S| {
214                Box::pin(handler(bot, event, state))
215            })),
216            // if we added accounts before the State was set, we've gotta set it to the default now
217            states: vec![S::default(); self.accounts.len()],
218            app: self.app,
219            ..self
220        }
221    }
222}
223
224impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
225where
226    S: Send + Sync + Clone + Component + 'static,
227{
228    /// Set the function that's called every time the swarm receives a
229    /// [`SwarmEvent`]. This is the intended way to handle global swarm events.
230    ///
231    /// Currently you can have up to one swarm handler.
232    ///
233    /// Note that if you're creating clients directly from the ECS using
234    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
235    /// this handler function won't be called for that client. This also applies
236    /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
237    /// most bots, though.
238    ///
239    /// ```
240    /// # use azalea::{prelude::*, swarm::prelude::*};
241    /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
242    /// swarm_builder.set_swarm_handler(swarm_handle);
243    ///
244    /// # #[derive(Component, Default, Clone)]
245    /// # struct State {}
246    ///
247    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
248    /// #     Ok(())
249    /// # }
250    ///
251    /// #[derive(Resource, Default, Clone)]
252    /// struct SwarmState {}
253    /// async fn swarm_handle(
254    ///     mut swarm: Swarm,
255    ///     event: SwarmEvent,
256    ///     state: SwarmState,
257    /// ) -> anyhow::Result<()> {
258    ///     Ok(())
259    /// }
260    /// ```
261    ///
262    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
263    #[must_use]
264    pub fn set_swarm_handler<SS, Fut, SR>(
265        self,
266        handler: SwarmHandleFn<SS, Fut>,
267    ) -> SwarmBuilder<S, SS, R, SR>
268    where
269        SS: Default + Send + Sync + Clone + Resource + 'static,
270        Fut: Future<Output = SR> + Send + 'static,
271    {
272        SwarmBuilder {
273            handler: self.handler,
274            app: self.app,
275            accounts: self.accounts,
276            states: self.states,
277            swarm_state: SS::default(),
278            swarm_handler: Some(Box::new(move |swarm, event, state| {
279                Box::pin(handler(swarm, event, state))
280            })),
281            join_delay: self.join_delay,
282            reconnect_after: self.reconnect_after,
283        }
284    }
285}
286
287impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
288where
289    S: Send + Sync + Clone + Component + 'static,
290    SS: Default + Send + Sync + Clone + Resource + 'static,
291    R: Send + 'static,
292    SR: Send + 'static,
293{
294    /// Add a vec of [`Account`]s to the swarm.
295    ///
296    /// Use [`Self::add_account`] to only add one account. If you want the
297    /// clients to have different default states, add them one at a time with
298    /// [`Self::add_account_with_state`].
299    ///
300    /// By default, every account will join at the same time, you can add a
301    /// delay with [`Self::join_delay`].
302    #[must_use]
303    pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
304    where
305        S: Default,
306    {
307        for account in accounts {
308            self = self.add_account(account);
309        }
310
311        self
312    }
313
314    /// Add a single new [`Account`] to the swarm.
315    ///
316    /// Use [`Self::add_accounts`] to add multiple accounts at a time.
317    ///
318    /// This will make the state for this client be the default, use
319    /// [`Self::add_account_with_state`] to avoid that.
320    #[must_use]
321    pub fn add_account(self, account: Account) -> Self
322    where
323        S: Default,
324    {
325        self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
326    }
327
328    /// Add an account with a custom initial state.
329    ///
330    /// Use just [`Self::add_account`] to use the `Default` implementation for
331    /// the state.
332    #[must_use]
333    pub fn add_account_with_state(self, account: Account, state: S) -> Self {
334        self.add_account_with_state_and_opts(account, state, JoinOpts::default())
335    }
336
337    /// Add an account with a custom initial state.
338    ///
339    /// Use just [`Self::add_account`] to use the `Default` implementation for
340    /// the state.
341    #[must_use]
342    pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
343    where
344        S: Default,
345    {
346        self.add_account_with_state_and_opts(account, S::default(), opts)
347    }
348
349    /// Same as [`Self::add_account_with_state`], but allow passing in custom
350    /// join options.
351    #[must_use]
352    pub fn add_account_with_state_and_opts(
353        mut self,
354        account: Account,
355        state: S,
356        join_opts: JoinOpts,
357    ) -> Self {
358        self.accounts.push((account, join_opts));
359        self.states.push(state);
360        self
361    }
362
363    /// Set the swarm state instead of initializing defaults.
364    #[must_use]
365    pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
366        self.swarm_state = swarm_state;
367        self
368    }
369
370    /// Add one or more plugins to this swarm.
371    ///
372    /// See [`Self::new_without_plugins`] to learn how to disable default
373    /// plugins.
374    #[must_use]
375    pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
376        self.app.add_plugins(plugins);
377        self
378    }
379
380    /// Set how long we should wait between each bot joining the server.
381    ///
382    /// By default, every bot will connect at the same time. If you set this
383    /// field, however, the bots will wait for the previous one to have
384    /// connected and *then* they'll wait the given duration.
385    #[must_use]
386    pub fn join_delay(mut self, delay: Duration) -> Self {
387        self.join_delay = Some(delay);
388        self
389    }
390
391    /// Configures the auto-reconnection behavior for our bots.
392    ///
393    /// If this is `Some`, then it'll set the default reconnection delay for our
394    /// bots (how long they'll wait after being kicked before they try
395    /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
396    ///
397    /// If this function isn't called, then our clients will reconnect after
398    /// [`DEFAULT_RECONNECT_DELAY`].
399    #[must_use]
400    pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
401        self.reconnect_after = delay.into();
402        self
403    }
404
405    /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
406    /// server.
407    ///
408    /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
409    /// that implements `TryInto<ServerAddress>`.
410    ///
411    /// [`ServerAddress`]: azalea_protocol::ServerAddress
412    pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<AppExit, StartError> {
413        // convert the TryInto<ServerAddress> into a ServerAddress
414        let address: ServerAddress = match address.try_into() {
415            Ok(address) => address,
416            Err(_) => return Err(StartError::InvalidAddress),
417        };
418
419        self.start_with_default_opts(address, JoinOpts::default())
420            .await
421    }
422
423    /// Do the same as [`Self::start`], but allow passing in default join
424    /// options for the bots.
425    pub async fn start_with_default_opts(
426        mut self,
427        address: impl TryInto<ServerAddress>,
428        default_join_opts: JoinOpts,
429    ) -> Result<AppExit, StartError> {
430        assert_eq!(
431            self.accounts.len(),
432            self.states.len(),
433            "There must be exactly one state per bot."
434        );
435
436        debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
437
438        // convert the TryInto<ServerAddress> into a ServerAddress
439        let address = match address.try_into() {
440            Ok(address) => address,
441            Err(_) => return Err(StartError::InvalidAddress),
442        };
443
444        let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
445        let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
446            a
447        } else {
448            resolver::resolve_address(&address).await?
449        };
450
451        let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
452
453        // we can't modify the swarm plugins after this
454        let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
455        let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
456
457        swarm_tx.send(SwarmEvent::Init).unwrap();
458
459        let main_schedule_label = self.app.update_schedule.unwrap();
460
461        let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
462
463        let swarm = Swarm {
464            ecs_lock: ecs_lock.clone(),
465
466            resolved_address: Arc::new(RwLock::new(resolved_address)),
467            address: Arc::new(RwLock::new(address)),
468            instance_container,
469
470            bots_tx,
471
472            swarm_tx: swarm_tx.clone(),
473        };
474
475        // run the main schedule so the startup systems run
476        {
477            let mut ecs = ecs_lock.lock();
478            ecs.insert_resource(swarm.clone());
479            ecs.insert_resource(self.swarm_state.clone());
480            if let Some(reconnect_after) = self.reconnect_after {
481                ecs.insert_resource(AutoReconnectDelay {
482                    delay: reconnect_after,
483                });
484            } else {
485                ecs.remove_resource::<AutoReconnectDelay>();
486            }
487            ecs.run_schedule(main_schedule_label);
488            ecs.clear_trackers();
489        }
490
491        // only do this after we inserted the Swarm and state resources to avoid errors
492        // where Res<Swarm> is inaccessible
493        start_running_systems();
494
495        // SwarmBuilder (self) isn't Send so we have to take all the things we need out
496        // of it
497        let swarm_clone = swarm.clone();
498        let join_delay = self.join_delay;
499        let accounts = self.accounts.clone();
500        let states = self.states.clone();
501
502        tokio::spawn(async move {
503            if let Some(join_delay) = join_delay {
504                // if there's a join delay, then join one by one
505                for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
506                    let mut join_opts = default_join_opts.clone();
507                    join_opts.update(bot_join_opts);
508                    let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
509                    tokio::time::sleep(join_delay).await;
510                }
511            } else {
512                // otherwise, join all at once
513                let swarm_borrow = &swarm_clone;
514                join_all(accounts.iter().zip(states).map(
515                    |((account, bot_join_opts), state)| async {
516                        let mut join_opts = default_join_opts.clone();
517                        join_opts.update(bot_join_opts);
518                        let _ = swarm_borrow
519                            .clone()
520                            .add_with_opts(account, state, &join_opts)
521                            .await;
522                    },
523                ))
524                .await;
525            }
526
527            swarm_tx.send(SwarmEvent::Login).unwrap();
528        });
529
530        let swarm_state = self.swarm_state;
531
532        // Watch swarm_rx and send those events to the swarm_handle.
533        let swarm_clone = swarm.clone();
534        let swarm_handler_task = tokio::spawn(async move {
535            while let Some(event) = swarm_rx.recv().await {
536                if let Some(swarm_handler) = &self.swarm_handler {
537                    tokio::spawn((swarm_handler)(
538                        swarm_clone.clone(),
539                        event,
540                        swarm_state.clone(),
541                    ));
542                }
543            }
544
545            unreachable!(
546                "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
547            );
548        });
549
550        // bot events
551        let client_handler_task = tokio::spawn(async move {
552            while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
553                if bots_rx.len() > 1_000 {
554                    static WARNED: AtomicBool = AtomicBool::new(false);
555                    if !WARNED.swap(true, atomic::Ordering::Relaxed) {
556                        warn!("the Client Event channel has more than 1000 items!")
557                    }
558                }
559
560                if let Some(handler) = &self.handler {
561                    let ecs_mutex = first_bot.ecs.clone();
562                    let mut ecs = ecs_mutex.lock();
563                    let mut query = ecs.query::<Option<&S>>();
564                    let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
565                        error!(
566                            "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
567                            first_bot.username(),
568                            first_bot.entity
569                        );
570                        continue;
571                    };
572                    let first_bot_entity = first_bot.entity;
573                    let first_bot_state = first_bot_state.clone();
574
575                    tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
576
577                    // this makes it not have to keep locking the ecs
578                    let mut states = HashMap::new();
579                    states.insert(first_bot_entity, first_bot_state);
580                    while let Ok((Some(event), bot)) = bots_rx.try_recv() {
581                        let state = match states.entry(bot.entity) {
582                            hash_map::Entry::Occupied(e) => e.into_mut(),
583                            hash_map::Entry::Vacant(e) => {
584                                let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
585                                    error!(
586                                        "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
587                                        bot.username(),
588                                        bot.entity
589                                    );
590                                    continue;
591                                };
592                                let state = state.clone();
593                                e.insert(state)
594                            }
595                        };
596                        tokio::spawn((handler)(bot, event, state.clone()));
597                    }
598                }
599            }
600        });
601
602        let appexit = appexit_rx
603            .await
604            .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
605
606        swarm_handler_task.abort();
607        client_handler_task.abort();
608
609        Ok(appexit)
610    }
611}
612
613impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
614    fn default() -> Self {
615        Self::new()
616    }
617}
618
619/// An event about something that doesn't have to do with a single bot.
620#[derive(Clone, Debug)]
621#[non_exhaustive]
622pub enum SwarmEvent {
623    /// All the bots in the swarm have successfully joined the server.
624    Login,
625    /// The swarm was created.
626    ///
627    /// This is only fired once, and it's guaranteed to be the first event to
628    /// fire.
629    Init,
630    /// A bot got disconnected from the server.
631    ///
632    /// If you'd like to implement special auto-reconnect behavior beyond what's
633    /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
634    /// and then call [`Swarm::add_with_opts`] with the account and options
635    /// from this event.
636    ///
637    /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
638    Disconnect(Box<Account>, JoinOpts),
639    /// At least one bot received a chat message.
640    Chat(ChatPacket),
641}
642
643pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
644pub type BoxSwarmHandleFn<SS, R> =
645    Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
646
647/// Make a bot [`Swarm`].
648///
649/// [`Swarm`]: struct.Swarm.html
650///
651/// # Examples
652/// ```rust,no_run
653/// use azalea::{prelude::*, swarm::prelude::*};
654/// use std::time::Duration;
655///
656/// #[derive(Default, Clone, Component)]
657/// struct State {}
658///
659/// #[derive(Default, Clone, Resource)]
660/// struct SwarmState {}
661///
662/// #[tokio::main(flavor = "current_thread")]
663/// async fn main() {
664///     let mut accounts = Vec::new();
665///     let mut states = Vec::new();
666///
667///     for i in 0..10 {
668///         accounts.push(Account::offline(&format!("bot{i}")));
669///         states.push(State::default());
670///     }
671///
672///     SwarmBuilder::new()
673///         .add_accounts(accounts.clone())
674///         .set_handler(handle)
675///         .set_swarm_handler(swarm_handle)
676///         .join_delay(Duration::from_millis(1000))
677///         .start("localhost")
678///         .await
679///         .unwrap();
680/// }
681///
682/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
683///     match &event {
684///         _ => {}
685///     }
686///     Ok(())
687/// }
688///
689/// async fn swarm_handle(
690///     mut swarm: Swarm,
691///     event: SwarmEvent,
692///     _state: SwarmState,
693/// ) -> anyhow::Result<()> {
694///     match &event {
695///         SwarmEvent::Chat(m) => {
696///             println!("{}", m.message().to_ansi());
697///         }
698///         _ => {}
699///     }
700///     Ok(())
701/// }
702impl Swarm {
703    /// Add a new account to the swarm.
704    ///
705    /// You can remove it later by calling [`Client::disconnect`].
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if the server's address could not be resolved.
710    pub async fn add<S: Component + Clone>(
711        &self,
712        account: &Account,
713        state: S,
714    ) -> Result<Client, JoinError> {
715        self.add_with_opts(account, state, &JoinOpts::default())
716            .await
717    }
718    /// Add a new account to the swarm, using custom options.
719    ///
720    /// This is useful if you want bots in the same swarm to connect to
721    /// different addresses. Usually you'll just want [`Self::add`] though.
722    ///
723    /// # Errors
724    ///
725    /// Returns an error if the server's address could not be resolved.
726    pub async fn add_with_opts<S: Component + Clone>(
727        &self,
728        account: &Account,
729        state: S,
730        join_opts: &JoinOpts,
731    ) -> Result<Client, JoinError> {
732        debug!(
733            "add_with_opts called for account {} with opts {join_opts:?}",
734            account.username
735        );
736
737        let address = join_opts
738            .custom_address
739            .clone()
740            .unwrap_or_else(|| self.address.read().clone());
741        let resolved_address = join_opts
742            .custom_resolved_address
743            .unwrap_or_else(|| *self.resolved_address.read());
744        let proxy = join_opts.proxy.clone();
745
746        let (tx, rx) = mpsc::unbounded_channel();
747
748        let client = Client::start_client(StartClientOpts {
749            ecs_lock: self.ecs_lock.clone(),
750            account: account.clone(),
751            connect_opts: ConnectOpts {
752                address,
753                resolved_address,
754                proxy,
755            },
756            event_sender: Some(tx),
757        })
758        .await;
759        // add the state to the client
760        {
761            let mut ecs = self.ecs_lock.lock();
762            ecs.entity_mut(client.entity).insert(state);
763        }
764
765        let cloned_bot = client.clone();
766        let swarm_tx = self.swarm_tx.clone();
767        let bots_tx = self.bots_tx.clone();
768
769        let join_opts = join_opts.clone();
770        tokio::spawn(Self::event_copying_task(
771            rx, swarm_tx, bots_tx, cloned_bot, join_opts,
772        ));
773
774        Ok(client)
775    }
776
777    /// Copy the events from a client's receiver into bots_tx, until the bot is
778    /// removed from the ECS.
779    async fn event_copying_task(
780        mut rx: mpsc::UnboundedReceiver<Event>,
781        swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
782        bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
783        bot: Client,
784        join_opts: JoinOpts,
785    ) {
786        while let Some(event) = rx.recv().await {
787            if rx.len() > 1_000 {
788                static WARNED_1_000: AtomicBool = AtomicBool::new(false);
789                if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
790                    warn!(
791                        "the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?"
792                    )
793                }
794
795                if rx.len() > 10_000 {
796                    static WARNED_10_000: AtomicBool = AtomicBool::new(false);
797                    if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
798                        warn!("the client's Event channel has more than 10,000 items!!")
799                    }
800
801                    if rx.len() > 100_000 {
802                        static WARNED_100_000: AtomicBool = AtomicBool::new(false);
803                        if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
804                            warn!("the client's Event channel has more than 100,000 items!!!")
805                        }
806
807                        if rx.len() > 1_000_000 {
808                            static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
809                            if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
810                                warn!(
811                                    "the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
812                                )
813                            }
814                        }
815                    }
816                }
817            }
818
819            if let Event::Disconnect(_) = event {
820                debug!(
821                    "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
822                    bot.entity
823                );
824                let account = bot
825                    .get_component::<Account>()
826                    .expect("bot is missing required Account component");
827                swarm_tx
828                    .send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
829                    .unwrap();
830            }
831
832            // we can't handle events here (since we can't copy the handler),
833            // they're handled above in SwarmBuilder::start
834            if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
835                error!(
836                    "Error sending event to swarm, aborting event_copying_task for {}: {e}",
837                    bot.entity
838                );
839                break;
840            }
841        }
842        debug!(
843            "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
844            bot.entity
845        );
846    }
847
848    /// Add a new account to the swarm, retrying if it couldn't join.
849    ///
850    /// This will run forever until the bot joins or the task is aborted.
851    ///
852    /// This does exponential backoff (though very limited), starting at 5
853    /// seconds and doubling up to 15 seconds.
854    #[deprecated(note = "azalea has auto-reconnect functionality built-in now, use `add` instead")]
855    pub async fn add_and_retry_forever<S: Component + Clone>(
856        &self,
857        account: &Account,
858        state: S,
859    ) -> Client {
860        #[allow(deprecated)]
861        self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
862            .await
863    }
864
865    /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
866    /// options.
867    #[deprecated(
868        note = "azalea has auto-reconnect functionality built-in now, use `add_with_opts` instead"
869    )]
870    pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
871        &self,
872        account: &Account,
873        state: S,
874        opts: &JoinOpts,
875    ) -> Client {
876        let mut disconnects: u32 = 0;
877        loop {
878            match self.add_with_opts(account, state.clone(), opts).await {
879                Ok(bot) => return bot,
880                Err(e) => {
881                    disconnects += 1;
882                    let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
883                        .min(Duration::from_secs(15));
884                    let username = account.username.clone();
885
886                    error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again.");
887
888                    sleep(delay).await;
889                }
890            }
891        }
892    }
893
894    /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
895    /// This will include clients that were disconnected without being removed
896    /// from the ECS.
897    ///
898    /// [`LocalEntity`]: azalea_entity::LocalEntity
899    pub fn client_entities(&self) -> Box<[Entity]> {
900        let mut ecs = self.ecs_lock.lock();
901        let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
902        query.iter(&ecs).collect::<Box<[Entity]>>()
903    }
904}
905
906impl IntoIterator for Swarm {
907    type Item = Client;
908    type IntoIter = std::vec::IntoIter<Self::Item>;
909
910    /// Iterate over the bots in this swarm.
911    ///
912    /// ```rust,no_run
913    /// # use azalea::{prelude::*, swarm::prelude::*};
914    /// #[derive(Component, Clone)]
915    /// # pub struct State;
916    /// # fn example(swarm: Swarm) {
917    /// for bot in swarm {
918    ///     let state = bot.component::<State>();
919    ///     // ...
920    /// }
921    /// # }
922    /// ```
923    fn into_iter(self) -> Self::IntoIter {
924        let client_entities = self.client_entities();
925
926        client_entities
927            .into_iter()
928            .map(|entity| Client::new(entity, self.ecs_lock.clone()))
929            .collect::<Box<[Client]>>()
930            .into_iter()
931    }
932}
933
934/// This plugin group will add all the default plugins necessary for swarms to
935/// work.
936pub struct DefaultSwarmPlugins;
937
938impl PluginGroup for DefaultSwarmPlugins {
939    fn build(self) -> PluginGroupBuilder {
940        PluginGroupBuilder::start::<Self>()
941            .add(chat::SwarmChatPlugin)
942            .add(events::SwarmPlugin)
943    }
944}
945
946/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
947///
948/// You probably don't need to use this manually since the compiler will infer
949/// it for you.
950#[derive(Resource, Clone, Default)]
951pub struct NoSwarmState;