Skip to main content

azalea/swarm/
builder.rs

1use std::{
2    collections::{HashMap, hash_map},
3    mem,
4    sync::{
5        Arc,
6        atomic::{self, AtomicBool},
7    },
8    time::Duration,
9};
10
11use azalea_client::{DefaultPlugins, account::Account, start_ecs_runner};
12use azalea_protocol::address::{ResolvableAddr, ResolvedAddr};
13use azalea_world::Worlds;
14use bevy_app::{App, AppExit, Plugins, SubApp};
15use bevy_ecs::{component::Component, resource::Resource};
16use futures::future::join_all;
17use parking_lot::RwLock;
18use tokio::{sync::mpsc, task};
19use tracing::{debug, error, warn};
20
21use crate::{
22    BoxHandleFn, HandleFn, JoinOpts, NoState,
23    auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24    bot::DefaultBotPlugins,
25    swarm::{
26        BoxSwarmHandleFn, DefaultSwarmPlugins, NoSwarmState, Swarm, SwarmEvent, SwarmHandleFn,
27    },
28};
29
30/// Create a new [`Swarm`].
31///
32/// The generics of this struct stand for the following:
33/// - S: State
34/// - SS: Swarm State
35/// - R: Return type of the handler
36/// - SR: Return type of the swarm handler
37///
38/// You shouldn't have to manually set them though, they'll be inferred for you.
39pub struct SwarmBuilder<S, SS, R, SR>
40where
41    S: Send + Sync + Clone + Component + 'static,
42    SS: Default + Send + Sync + Clone + Resource + 'static,
43    Self: Send,
44{
45    // SubApp is used instead of App to make it Send
46    pub(crate) app: SubApp,
47    /// The accounts and proxies that are going to join the server.
48    pub(crate) accounts: Vec<(Account, JoinOpts)>,
49    /// The individual bot states.
50    ///
51    /// This must be the same length as `accounts`, since each bot gets one
52    /// state.
53    pub(crate) states: Vec<S>,
54    /// The state for the overall swarm.
55    pub(crate) swarm_state: SS,
56    /// The function that's called every time a bot receives an [`Event`].
57    pub(crate) handler: Option<BoxHandleFn<S, R>>,
58    /// The function that's called every time the swarm receives a
59    /// [`SwarmEvent`].
60    pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
61
62    /// How long we should wait between each bot joining the server.
63    ///
64    /// If this is None, every bot will connect at the same time. None is
65    /// different than a duration of 0, since if a duration is present the
66    /// bots will wait for the previous one to be ready.
67    pub(crate) join_delay: Option<Duration>,
68
69    /// The default reconnection delay for our bots.
70    ///
71    /// This will change the value of the [`AutoReconnectDelay`] resource.
72    pub(crate) reconnect_after: Option<Duration>,
73}
74impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
75    /// Start creating the swarm.
76    #[must_use]
77    pub fn new() -> Self {
78        Self::new_without_plugins().add_plugins((
79            DefaultPlugins,
80            DefaultBotPlugins,
81            DefaultSwarmPlugins,
82        ))
83    }
84
85    /// [`Self::new`] but without adding the plugins by default.
86    ///
87    /// This is useful if you want to disable a default plugin. This also exists
88    /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
89    ///
90    /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
91    /// [`DefaultSwarmPlugins`] to this.
92    ///
93    /// ```
94    /// # use azalea::{prelude::*, swarm::{prelude::*, DefaultSwarmPlugins}, bot::DefaultBotPlugins};
95    /// use azalea::app::PluginGroup;
96    ///
97    /// let swarm_builder = SwarmBuilder::new_without_plugins()
98    ///     .add_plugins((
99    ///         DefaultBotPlugins,
100    ///         DefaultSwarmPlugins,
101    ///         azalea::DefaultPlugins
102    ///             .build()
103    ///             .disable::<azalea::chat_signing::ChatSigningPlugin>(),
104    /// ));
105    /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
106    /// # #[derive(Clone, Component, Default, Resource)]
107    /// # pub struct State;
108    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
109    /// #     Ok(())
110    /// # }
111    /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
112    /// #     Ok(())
113    /// # }
114    /// ```
115    ///
116    /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
117    #[must_use]
118    pub fn new_without_plugins() -> Self {
119        SwarmBuilder {
120            // we create the app here so plugins can add onto it.
121            // the schedules won't run until [`Self::start`] is called.
122
123            // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
124            // initialized
125            app: mem::take(App::new().main_mut()),
126            accounts: Vec::new(),
127            states: Vec::new(),
128            swarm_state: NoSwarmState,
129            handler: None,
130            swarm_handler: None,
131            join_delay: None,
132            reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
133        }
134    }
135}
136
137impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
138where
139    SS: Default + Send + Sync + Clone + Resource + 'static,
140{
141    /// Set the function that's called every time a bot receives an
142    /// [`Event`](crate::Event). This is the intended way to handle
143    /// normal per-bot events.
144    ///
145    /// Currently you can have up to one handler.
146    ///
147    /// Note that if you're creating clients directly from the ECS using
148    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
149    /// the handler function won't be called for that client. This also applies
150    /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
151    /// most bots, though.
152    ///
153    /// ```
154    /// # use azalea::{prelude::*, swarm::prelude::*};
155    /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
156    /// swarm_builder.set_handler(handle);
157    ///
158    /// #[derive(Clone, Component, Default)]
159    /// struct State {}
160    /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
161    ///     Ok(())
162    /// }
163    ///
164    /// # #[derive(Clone, Default, Resource)]
165    /// # struct SwarmState {}
166    /// # async fn swarm_handle(
167    /// #     mut swarm: Swarm,
168    /// #     event: SwarmEvent,
169    /// #     state: SwarmState,
170    /// # ) -> anyhow::Result<()> {
171    /// #     Ok(())
172    /// # }
173    /// ```
174    ///
175    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
176    #[must_use]
177    pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
178    where
179        Fut: Future<Output = R> + Send + 'static,
180        S: Send + Sync + Clone + Component + Default + 'static,
181    {
182        SwarmBuilder {
183            handler: Some(Box::new(move |bot, event, state: S| {
184                Box::pin(handler(bot, event, state))
185            })),
186            // if we added accounts before the State was set, we've gotta set it to the default now
187            states: vec![S::default(); self.accounts.len()],
188            app: self.app,
189            ..self
190        }
191    }
192}
193
194impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
195where
196    S: Send + Sync + Clone + Component + 'static,
197{
198    /// Set the function that's called every time the swarm receives a
199    /// [`SwarmEvent`]. This is the intended way to handle global swarm events.
200    ///
201    /// Currently you can have up to one swarm handler.
202    ///
203    /// Note that if you're creating clients directly from the ECS using
204    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
205    /// this handler function won't be called for that client. This also applies
206    /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
207    /// most bots, though.
208    ///
209    /// ```
210    /// # use azalea::{prelude::*, swarm::prelude::*};
211    /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
212    /// swarm_builder.set_swarm_handler(swarm_handle);
213    ///
214    /// # #[derive(Clone, Component, Default)]
215    /// # struct State {}
216    ///
217    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
218    /// #     Ok(())
219    /// # }
220    ///
221    /// #[derive(Clone, Default, Resource)]
222    /// struct SwarmState {}
223    /// async fn swarm_handle(
224    ///     mut swarm: Swarm,
225    ///     event: SwarmEvent,
226    ///     state: SwarmState,
227    /// ) -> anyhow::Result<()> {
228    ///     Ok(())
229    /// }
230    /// ```
231    ///
232    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
233    #[must_use]
234    pub fn set_swarm_handler<SS, Fut, SR>(
235        self,
236        handler: SwarmHandleFn<SS, Fut>,
237    ) -> SwarmBuilder<S, SS, R, SR>
238    where
239        SS: Default + Send + Sync + Clone + Resource + 'static,
240        Fut: Future<Output = SR> + Send + 'static,
241    {
242        SwarmBuilder {
243            handler: self.handler,
244            app: self.app,
245            accounts: self.accounts,
246            states: self.states,
247            swarm_state: SS::default(),
248            swarm_handler: Some(Box::new(move |swarm, event, state| {
249                Box::pin(handler(swarm, event, state))
250            })),
251            join_delay: self.join_delay,
252            reconnect_after: self.reconnect_after,
253        }
254    }
255}
256
257impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
258where
259    S: Send + Sync + Clone + Component + 'static,
260    SS: Default + Send + Sync + Clone + Resource + 'static,
261    R: Send + 'static,
262    SR: Send + 'static,
263{
264    /// Add a vec of [`Account`]s to the swarm.
265    ///
266    /// Use [`Self::add_account`] to only add one account. If you want the
267    /// clients to have different default states, add them one at a time with
268    /// [`Self::add_account_with_state`].
269    ///
270    /// By default, every account will join at the same time, you can add a
271    /// delay with [`Self::join_delay`].
272    #[must_use]
273    pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
274    where
275        S: Default,
276    {
277        for account in accounts {
278            self = self.add_account(account);
279        }
280
281        self
282    }
283
284    /// Add a single new [`Account`] to the swarm.
285    ///
286    /// Use [`Self::add_accounts`] to add multiple accounts at a time.
287    ///
288    /// This will make the state for this client be the default, use
289    /// [`Self::add_account_with_state`] to avoid that.
290    #[must_use]
291    pub fn add_account(self, account: Account) -> Self
292    where
293        S: Default,
294    {
295        self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
296    }
297
298    /// Add an account with a custom initial state.
299    ///
300    /// Use just [`Self::add_account`] to use the `Default` implementation for
301    /// the state.
302    #[must_use]
303    pub fn add_account_with_state(self, account: Account, state: S) -> Self {
304        self.add_account_with_state_and_opts(account, state, JoinOpts::default())
305    }
306
307    /// Add an account with a custom initial state.
308    ///
309    /// Use just [`Self::add_account`] to use the `Default` implementation for
310    /// the state.
311    #[must_use]
312    pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
313    where
314        S: Default,
315    {
316        self.add_account_with_state_and_opts(account, S::default(), opts)
317    }
318
319    /// Same as [`Self::add_account_with_state`], but allow passing in custom
320    /// join options.
321    #[must_use]
322    pub fn add_account_with_state_and_opts(
323        mut self,
324        account: Account,
325        state: S,
326        join_opts: JoinOpts,
327    ) -> Self {
328        self.accounts.push((account, join_opts));
329        self.states.push(state);
330        self
331    }
332
333    /// Set the swarm state instead of initializing defaults.
334    #[must_use]
335    pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
336        self.swarm_state = swarm_state;
337        self
338    }
339
340    /// Add one or more plugins to this swarm.
341    ///
342    /// See [`Self::new_without_plugins`] to learn how to disable default
343    /// plugins.
344    #[must_use]
345    pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
346        self.app.add_plugins(plugins);
347        self
348    }
349
350    /// Set how long we should wait between each bot joining the server.
351    ///
352    /// By default, every bot will connect at the same time. If you set this
353    /// field, however, the bots will wait for the previous one to have
354    /// connected and *then* they'll wait the given duration.
355    #[must_use]
356    pub fn join_delay(mut self, delay: Duration) -> Self {
357        self.join_delay = Some(delay);
358        self
359    }
360
361    /// Configures the auto-reconnection behavior for our bots.
362    ///
363    /// If this is `Some`, then it'll set the default reconnection delay for our
364    /// bots (how long they'll wait after being kicked before they try
365    /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
366    ///
367    /// If this function isn't called, then our clients will reconnect after
368    /// [`DEFAULT_RECONNECT_DELAY`].
369    #[must_use]
370    pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
371        self.reconnect_after = delay.into();
372        self
373    }
374
375    /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
376    /// server.
377    ///
378    /// The `address` argument can be a `&str`, [`ServerAddr`],
379    /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`].
380    ///
381    /// [`ServerAddr`]: ../../azalea_protocol/address/struct.ServerAddr.html
382    /// [`ResolvedAddr`]: ../../azalea_protocol/address/struct.ResolvedAddr.html
383    /// [`ResolvableAddr`]: ../../azalea_protocol/address/trait.ResolvableAddr.html
384    pub async fn start(self, address: impl ResolvableAddr) -> AppExit {
385        self.start_with_opts(address, JoinOpts::default()).await
386    }
387
388    #[doc(hidden)]
389    #[deprecated = "renamed to `start_with_opts`."]
390    pub async fn start_with_default_opts(
391        self,
392        address: impl ResolvableAddr,
393        default_join_opts: JoinOpts,
394    ) -> AppExit {
395        self.start_with_opts(address, default_join_opts).await
396    }
397
398    /// Do the same as [`Self::start`], but allow passing in default join
399    /// options for the bots.
400    pub async fn start_with_opts(
401        mut self,
402        address: impl ResolvableAddr,
403        join_opts: JoinOpts,
404    ) -> AppExit {
405        assert_eq!(
406            self.accounts.len(),
407            self.states.len(),
408            "There must be exactly one state per bot."
409        );
410
411        debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
412
413        let address = if let Some(socket_addr) = join_opts.custom_socket_addr {
414            let server_addr = if let Some(server_addr) = join_opts
415                .custom_server_addr
416                .clone()
417                .or_else(|| address.clone().server_addr().ok())
418            {
419                server_addr
420            } else {
421                error!(
422                    "Failed to parse address: {address:?}. If this was expected, consider passing in a `ServerAddr` instead."
423                );
424                return AppExit::error();
425            };
426
427            ResolvedAddr {
428                server: server_addr,
429                socket: socket_addr,
430            }
431        } else {
432            let Ok(addr) = address.clone().resolve().await else {
433                error!(
434                    "Failed to resolve address: {address:?}. If this was expected, consider resolving the address earlier with `ResolvableAddr::resolve`."
435                );
436                return AppExit::error();
437            };
438            addr
439        };
440
441        let worlds = Arc::new(RwLock::new(Worlds::default()));
442
443        // we can't modify the swarm plugins after this
444        let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
445        let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
446
447        swarm_tx.send(SwarmEvent::Init).unwrap();
448
449        let main_schedule_label = self.app.update_schedule.unwrap();
450
451        let local_set = task::LocalSet::new();
452
453        local_set.run_until(async move {
454            // start_ecs_runner must be run inside of the LocalSet
455            let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
456
457            let swarm = Swarm {
458                ecs: ecs_lock.clone(),
459
460                address: Arc::new(RwLock::new(address)),
461                worlds,
462
463                bots_tx,
464
465                swarm_tx: swarm_tx.clone(),
466            };
467
468            // run the main schedule so the startup systems run
469            {
470                let mut ecs = ecs_lock.write();
471                ecs.insert_resource(swarm.clone());
472                ecs.insert_resource(self.swarm_state.clone());
473                if let Some(reconnect_after) = self.reconnect_after {
474                    ecs.insert_resource(AutoReconnectDelay {
475                        delay: reconnect_after,
476                    });
477                } else {
478                    ecs.remove_resource::<AutoReconnectDelay>();
479                }
480                ecs.run_schedule(main_schedule_label);
481                ecs.clear_trackers();
482            }
483
484            // only do this after we inserted the Swarm and state resources to avoid errors
485            // where Res<Swarm> is inaccessible
486            start_running_systems();
487
488            // SwarmBuilder (self) isn't Send so we have to take all the things we need out
489            // of it
490            let swarm_clone = swarm.clone();
491            let join_delay = self.join_delay;
492            let accounts = self.accounts.clone();
493            let states = self.states.clone();
494
495            task::spawn_local(async move {
496                if let Some(join_delay) = join_delay {
497                    // if there's a join delay, then join one by one
498                    for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
499                        let mut join_opts = join_opts.clone();
500                        join_opts.update(bot_join_opts);
501                        let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
502                        tokio::time::sleep(join_delay).await;
503                    }
504                } else {
505                    // otherwise, join all at once
506                    let swarm_borrow = &swarm_clone;
507                    join_all(accounts.iter().zip(states).map(
508                        |((account, bot_join_opts), state)| async {
509                            let mut join_opts = join_opts.clone();
510                            join_opts.update(bot_join_opts);
511                            let _ = swarm_borrow
512                                .clone()
513                                .add_with_opts(account, state, &join_opts)
514                                .await;
515                        },
516                    ))
517                    .await;
518                }
519
520                swarm_tx.send(SwarmEvent::Login).unwrap();
521            });
522
523            let swarm_state = self.swarm_state;
524
525            // Watch swarm_rx and send those events to the swarm_handle.
526            let swarm_clone = swarm.clone();
527            let swarm_handler_task = task::spawn_local(async move {
528                while let Some(event) = swarm_rx.recv().await {
529                    if let Some(swarm_handler) = &self.swarm_handler {
530                        task::spawn_local((swarm_handler)(
531                            swarm_clone.clone(),
532                            event,
533                            swarm_state.clone(),
534                        ));
535                    }
536                }
537
538                unreachable!(
539                    "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
540                );
541            });
542
543            // bot events
544            let client_handler_task = task::spawn_local(async move {
545                while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
546                    if bots_rx.len() > 1_000 {
547                        static WARNED: AtomicBool = AtomicBool::new(false);
548                        if !WARNED.swap(true, atomic::Ordering::Relaxed) {
549                            warn!("the Client Event channel has more than 1000 items!")
550                        }
551                    }
552
553                    if let Some(handler) = &self.handler {
554                        let ecs_mutex = first_bot.ecs.clone();
555                        let mut ecs = ecs_mutex.write();
556                        let mut query = ecs.query::<Option<&S>>();
557                        let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
558                            error!(
559                                "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
560                                first_bot.username(),
561                                first_bot.entity
562                            );
563                            continue;
564                        };
565                        let first_bot_entity = first_bot.entity;
566                        let first_bot_state = first_bot_state.clone();
567
568                        task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone()));
569
570                        // this makes it not have to keep locking the ecs
571                        let mut states = HashMap::new();
572                        states.insert(first_bot_entity, first_bot_state);
573                        while let Ok((Some(event), bot)) = bots_rx.try_recv() {
574                            let state = match states.entry(bot.entity) {
575                                hash_map::Entry::Occupied(e) => e.into_mut(),
576                                hash_map::Entry::Vacant(e) => {
577                                    let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
578                                        error!(
579                                            "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
580                                            bot.username(),
581                                            bot.entity
582                                        );
583                                        continue;
584                                    };
585                                    let state = state.clone();
586                                    e.insert(state)
587                                }
588                            };
589                            task::spawn_local((handler)(bot, event, state.clone()));
590                        }
591                    }
592                }
593            });
594
595            let app_exit = appexit_rx
596                .await
597                .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
598
599            swarm_handler_task.abort();
600            client_handler_task.abort();
601
602            app_exit
603        }).await
604    }
605}
606
607impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
608    fn default() -> Self {
609        Self::new()
610    }
611}