azalea/swarm/
builder.rs

1use std::{
2    collections::{HashMap, hash_map},
3    mem,
4    sync::{
5        Arc,
6        atomic::{self, AtomicBool},
7    },
8    time::Duration,
9};
10
11use azalea_client::{Account, DefaultPlugins, start_ecs_runner};
12use azalea_protocol::address::{ResolvableAddr, ResolvedAddr};
13use azalea_world::InstanceContainer;
14use bevy_app::{App, AppExit, Plugins, SubApp};
15use bevy_ecs::{component::Component, resource::Resource};
16use futures::future::join_all;
17use parking_lot::RwLock;
18use tokio::{sync::mpsc, task};
19use tracing::{debug, error, warn};
20
21use crate::{
22    BoxHandleFn, HandleFn, JoinOpts, NoState,
23    auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24    bot::DefaultBotPlugins,
25    swarm::{
26        BoxSwarmHandleFn, DefaultSwarmPlugins, NoSwarmState, Swarm, SwarmEvent, SwarmHandleFn,
27    },
28};
29
30/// Create a new [`Swarm`].
31///
32/// The generics of this struct stand for the following:
33/// - S: State
34/// - SS: Swarm State
35/// - R: Return type of the handler
36/// - SR: Return type of the swarm handler
37///
38/// You shouldn't have to manually set them though, they'll be inferred for you.
39pub struct SwarmBuilder<S, SS, R, SR>
40where
41    S: Send + Sync + Clone + Component + 'static,
42    SS: Default + Send + Sync + Clone + Resource + 'static,
43    Self: Send,
44{
45    // SubApp is used instead of App to make it Send
46    pub(crate) app: SubApp,
47    /// The accounts and proxies that are going to join the server.
48    pub(crate) accounts: Vec<(Account, JoinOpts)>,
49    /// The individual bot states.
50    ///
51    /// This must be the same length as `accounts`, since each bot gets one
52    /// state.
53    pub(crate) states: Vec<S>,
54    /// The state for the overall swarm.
55    pub(crate) swarm_state: SS,
56    /// The function that's called every time a bot receives an [`Event`].
57    pub(crate) handler: Option<BoxHandleFn<S, R>>,
58    /// The function that's called every time the swarm receives a
59    /// [`SwarmEvent`].
60    pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
61
62    /// How long we should wait between each bot joining the server.
63    ///
64    /// If this is None, every bot will connect at the same time. None is
65    /// different than a duration of 0, since if a duration is present the
66    /// bots will wait for the previous one to be ready.
67    pub(crate) join_delay: Option<Duration>,
68
69    /// The default reconnection delay for our bots.
70    ///
71    /// This will change the value of the [`AutoReconnectDelay`] resource.
72    pub(crate) reconnect_after: Option<Duration>,
73}
74impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
75    /// Start creating the swarm.
76    #[must_use]
77    pub fn new() -> Self {
78        Self::new_without_plugins()
79            .add_plugins(DefaultPlugins)
80            .add_plugins(DefaultBotPlugins)
81            .add_plugins(DefaultSwarmPlugins)
82    }
83
84    /// [`Self::new`] but without adding the plugins by default.
85    ///
86    /// This is useful if you want to disable a default plugin. This also exists
87    /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
88    ///
89    /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
90    /// [`DefaultSwarmPlugins`] to this.
91    ///
92    /// ```
93    /// # use azalea::{prelude::*, swarm::prelude::*};
94    /// use azalea::app::PluginGroup;
95    ///
96    /// let swarm_builder = SwarmBuilder::new_without_plugins()
97    ///     .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>())
98    ///     .add_plugins(azalea::bot::DefaultBotPlugins)
99    ///     .add_plugins(azalea::swarm::DefaultSwarmPlugins);
100    /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
101    /// # #[derive(Clone, Component, Default, Resource)]
102    /// # pub struct State;
103    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
104    /// #     Ok(())
105    /// # }
106    /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
107    /// #     Ok(())
108    /// # }
109    /// ```
110    ///
111    /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
112    #[must_use]
113    pub fn new_without_plugins() -> Self {
114        SwarmBuilder {
115            // we create the app here so plugins can add onto it.
116            // the schedules won't run until [`Self::start`] is called.
117
118            // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
119            // initialized
120            app: mem::take(App::new().main_mut()),
121            accounts: Vec::new(),
122            states: Vec::new(),
123            swarm_state: NoSwarmState,
124            handler: None,
125            swarm_handler: None,
126            join_delay: None,
127            reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
128        }
129    }
130}
131
132impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
133where
134    SS: Default + Send + Sync + Clone + Resource + 'static,
135{
136    /// Set the function that's called every time a bot receives an
137    /// [`Event`](crate::Event). This is the intended way to handle
138    /// normal per-bot events.
139    ///
140    /// Currently you can have up to one handler.
141    ///
142    /// Note that if you're creating clients directly from the ECS using
143    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
144    /// the handler function won't be called for that client. This also applies
145    /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
146    /// most bots, though.
147    ///
148    /// ```
149    /// # use azalea::{prelude::*, swarm::prelude::*};
150    /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
151    /// swarm_builder.set_handler(handle);
152    ///
153    /// #[derive(Clone, Component, Default)]
154    /// struct State {}
155    /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
156    ///     Ok(())
157    /// }
158    ///
159    /// # #[derive(Clone, Default, Resource)]
160    /// # struct SwarmState {}
161    /// # async fn swarm_handle(
162    /// #     mut swarm: Swarm,
163    /// #     event: SwarmEvent,
164    /// #     state: SwarmState,
165    /// # ) -> anyhow::Result<()> {
166    /// #     Ok(())
167    /// # }
168    /// ```
169    ///
170    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
171    #[must_use]
172    pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
173    where
174        Fut: Future<Output = R> + Send + 'static,
175        S: Send + Sync + Clone + Component + Default + 'static,
176    {
177        SwarmBuilder {
178            handler: Some(Box::new(move |bot, event, state: S| {
179                Box::pin(handler(bot, event, state))
180            })),
181            // if we added accounts before the State was set, we've gotta set it to the default now
182            states: vec![S::default(); self.accounts.len()],
183            app: self.app,
184            ..self
185        }
186    }
187}
188
189impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
190where
191    S: Send + Sync + Clone + Component + 'static,
192{
193    /// Set the function that's called every time the swarm receives a
194    /// [`SwarmEvent`]. This is the intended way to handle global swarm events.
195    ///
196    /// Currently you can have up to one swarm handler.
197    ///
198    /// Note that if you're creating clients directly from the ECS using
199    /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
200    /// this handler function won't be called for that client. This also applies
201    /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
202    /// most bots, though.
203    ///
204    /// ```
205    /// # use azalea::{prelude::*, swarm::prelude::*};
206    /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
207    /// swarm_builder.set_swarm_handler(swarm_handle);
208    ///
209    /// # #[derive(Clone, Component, Default)]
210    /// # struct State {}
211    ///
212    /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
213    /// #     Ok(())
214    /// # }
215    ///
216    /// #[derive(Clone, Default, Resource)]
217    /// struct SwarmState {}
218    /// async fn swarm_handle(
219    ///     mut swarm: Swarm,
220    ///     event: SwarmEvent,
221    ///     state: SwarmState,
222    /// ) -> anyhow::Result<()> {
223    ///     Ok(())
224    /// }
225    /// ```
226    ///
227    /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
228    #[must_use]
229    pub fn set_swarm_handler<SS, Fut, SR>(
230        self,
231        handler: SwarmHandleFn<SS, Fut>,
232    ) -> SwarmBuilder<S, SS, R, SR>
233    where
234        SS: Default + Send + Sync + Clone + Resource + 'static,
235        Fut: Future<Output = SR> + Send + 'static,
236    {
237        SwarmBuilder {
238            handler: self.handler,
239            app: self.app,
240            accounts: self.accounts,
241            states: self.states,
242            swarm_state: SS::default(),
243            swarm_handler: Some(Box::new(move |swarm, event, state| {
244                Box::pin(handler(swarm, event, state))
245            })),
246            join_delay: self.join_delay,
247            reconnect_after: self.reconnect_after,
248        }
249    }
250}
251
252impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
253where
254    S: Send + Sync + Clone + Component + 'static,
255    SS: Default + Send + Sync + Clone + Resource + 'static,
256    R: Send + 'static,
257    SR: Send + 'static,
258{
259    /// Add a vec of [`Account`]s to the swarm.
260    ///
261    /// Use [`Self::add_account`] to only add one account. If you want the
262    /// clients to have different default states, add them one at a time with
263    /// [`Self::add_account_with_state`].
264    ///
265    /// By default, every account will join at the same time, you can add a
266    /// delay with [`Self::join_delay`].
267    #[must_use]
268    pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
269    where
270        S: Default,
271    {
272        for account in accounts {
273            self = self.add_account(account);
274        }
275
276        self
277    }
278
279    /// Add a single new [`Account`] to the swarm.
280    ///
281    /// Use [`Self::add_accounts`] to add multiple accounts at a time.
282    ///
283    /// This will make the state for this client be the default, use
284    /// [`Self::add_account_with_state`] to avoid that.
285    #[must_use]
286    pub fn add_account(self, account: Account) -> Self
287    where
288        S: Default,
289    {
290        self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
291    }
292
293    /// Add an account with a custom initial state.
294    ///
295    /// Use just [`Self::add_account`] to use the `Default` implementation for
296    /// the state.
297    #[must_use]
298    pub fn add_account_with_state(self, account: Account, state: S) -> Self {
299        self.add_account_with_state_and_opts(account, state, JoinOpts::default())
300    }
301
302    /// Add an account with a custom initial state.
303    ///
304    /// Use just [`Self::add_account`] to use the `Default` implementation for
305    /// the state.
306    #[must_use]
307    pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
308    where
309        S: Default,
310    {
311        self.add_account_with_state_and_opts(account, S::default(), opts)
312    }
313
314    /// Same as [`Self::add_account_with_state`], but allow passing in custom
315    /// join options.
316    #[must_use]
317    pub fn add_account_with_state_and_opts(
318        mut self,
319        account: Account,
320        state: S,
321        join_opts: JoinOpts,
322    ) -> Self {
323        self.accounts.push((account, join_opts));
324        self.states.push(state);
325        self
326    }
327
328    /// Set the swarm state instead of initializing defaults.
329    #[must_use]
330    pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
331        self.swarm_state = swarm_state;
332        self
333    }
334
335    /// Add one or more plugins to this swarm.
336    ///
337    /// See [`Self::new_without_plugins`] to learn how to disable default
338    /// plugins.
339    #[must_use]
340    pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
341        self.app.add_plugins(plugins);
342        self
343    }
344
345    /// Set how long we should wait between each bot joining the server.
346    ///
347    /// By default, every bot will connect at the same time. If you set this
348    /// field, however, the bots will wait for the previous one to have
349    /// connected and *then* they'll wait the given duration.
350    #[must_use]
351    pub fn join_delay(mut self, delay: Duration) -> Self {
352        self.join_delay = Some(delay);
353        self
354    }
355
356    /// Configures the auto-reconnection behavior for our bots.
357    ///
358    /// If this is `Some`, then it'll set the default reconnection delay for our
359    /// bots (how long they'll wait after being kicked before they try
360    /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
361    ///
362    /// If this function isn't called, then our clients will reconnect after
363    /// [`DEFAULT_RECONNECT_DELAY`].
364    #[must_use]
365    pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
366        self.reconnect_after = delay.into();
367        self
368    }
369
370    /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
371    /// server.
372    ///
373    /// The `address` argument can be a `&str`, [`ServerAddr`],
374    /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`].
375    ///
376    /// [`ServerAddr`]: azalea_protocol::address::ServerAddr
377    pub async fn start(self, address: impl ResolvableAddr) -> AppExit {
378        self.start_with_opts(address, JoinOpts::default()).await
379    }
380
381    #[doc(hidden)]
382    #[deprecated = "renamed to `start_with_opts`."]
383    pub async fn start_with_default_opts(
384        self,
385        address: impl ResolvableAddr,
386        default_join_opts: JoinOpts,
387    ) -> AppExit {
388        self.start_with_opts(address, default_join_opts).await
389    }
390
391    /// Do the same as [`Self::start`], but allow passing in default join
392    /// options for the bots.
393    pub async fn start_with_opts(
394        mut self,
395        address: impl ResolvableAddr,
396        join_opts: JoinOpts,
397    ) -> AppExit {
398        assert_eq!(
399            self.accounts.len(),
400            self.states.len(),
401            "There must be exactly one state per bot."
402        );
403
404        debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
405
406        let address = if let Some(socket_addr) = join_opts.custom_socket_addr {
407            let server_addr = if let Some(server_addr) = join_opts
408                .custom_server_addr
409                .clone()
410                .or_else(|| address.clone().server_addr().ok())
411            {
412                server_addr
413            } else {
414                error!(
415                    "Failed to parse address: {address:?}. If this was expected, consider passing in a `ServerAddr` instead."
416                );
417                return AppExit::error();
418            };
419
420            ResolvedAddr {
421                server: server_addr,
422                socket: socket_addr,
423            }
424        } else {
425            let Ok(addr) = address.clone().resolve().await else {
426                error!(
427                    "Failed to resolve address: {address:?}. If this was expected, consider resolving the address earlier with `ResolvableAddr::resolve`."
428                );
429                return AppExit::error();
430            };
431            addr
432        };
433
434        let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
435
436        // we can't modify the swarm plugins after this
437        let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
438        let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
439
440        swarm_tx.send(SwarmEvent::Init).unwrap();
441
442        let main_schedule_label = self.app.update_schedule.unwrap();
443
444        let local_set = task::LocalSet::new();
445
446        local_set.run_until(async move {
447            // start_ecs_runner must be run inside of the LocalSet
448            let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
449
450            let swarm = Swarm {
451                ecs: ecs_lock.clone(),
452
453                address: Arc::new(RwLock::new(address)),
454                instance_container,
455
456                bots_tx,
457
458                swarm_tx: swarm_tx.clone(),
459            };
460
461            // run the main schedule so the startup systems run
462            {
463                let mut ecs = ecs_lock.write();
464                ecs.insert_resource(swarm.clone());
465                ecs.insert_resource(self.swarm_state.clone());
466                if let Some(reconnect_after) = self.reconnect_after {
467                    ecs.insert_resource(AutoReconnectDelay {
468                        delay: reconnect_after,
469                    });
470                } else {
471                    ecs.remove_resource::<AutoReconnectDelay>();
472                }
473                ecs.run_schedule(main_schedule_label);
474                ecs.clear_trackers();
475            }
476
477            // only do this after we inserted the Swarm and state resources to avoid errors
478            // where Res<Swarm> is inaccessible
479            start_running_systems();
480
481            // SwarmBuilder (self) isn't Send so we have to take all the things we need out
482            // of it
483            let swarm_clone = swarm.clone();
484            let join_delay = self.join_delay;
485            let accounts = self.accounts.clone();
486            let states = self.states.clone();
487
488            task::spawn_local(async move {
489                if let Some(join_delay) = join_delay {
490                    // if there's a join delay, then join one by one
491                    for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
492                        let mut join_opts = join_opts.clone();
493                        join_opts.update(bot_join_opts);
494                        let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
495                        tokio::time::sleep(join_delay).await;
496                    }
497                } else {
498                    // otherwise, join all at once
499                    let swarm_borrow = &swarm_clone;
500                    join_all(accounts.iter().zip(states).map(
501                        |((account, bot_join_opts), state)| async {
502                            let mut join_opts = join_opts.clone();
503                            join_opts.update(bot_join_opts);
504                            let _ = swarm_borrow
505                                .clone()
506                                .add_with_opts(account, state, &join_opts)
507                                .await;
508                        },
509                    ))
510                    .await;
511                }
512
513                swarm_tx.send(SwarmEvent::Login).unwrap();
514            });
515
516            let swarm_state = self.swarm_state;
517
518            // Watch swarm_rx and send those events to the swarm_handle.
519            let swarm_clone = swarm.clone();
520            let swarm_handler_task = task::spawn_local(async move {
521                while let Some(event) = swarm_rx.recv().await {
522                    if let Some(swarm_handler) = &self.swarm_handler {
523                        task::spawn_local((swarm_handler)(
524                            swarm_clone.clone(),
525                            event,
526                            swarm_state.clone(),
527                        ));
528                    }
529                }
530
531                unreachable!(
532                    "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
533                );
534            });
535
536            // bot events
537            let client_handler_task = task::spawn_local(async move {
538                while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
539                    if bots_rx.len() > 1_000 {
540                        static WARNED: AtomicBool = AtomicBool::new(false);
541                        if !WARNED.swap(true, atomic::Ordering::Relaxed) {
542                            warn!("the Client Event channel has more than 1000 items!")
543                        }
544                    }
545
546                    if let Some(handler) = &self.handler {
547                        let ecs_mutex = first_bot.ecs.clone();
548                        let mut ecs = ecs_mutex.write();
549                        let mut query = ecs.query::<Option<&S>>();
550                        let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
551                            error!(
552                                "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
553                                first_bot.username(),
554                                first_bot.entity
555                            );
556                            continue;
557                        };
558                        let first_bot_entity = first_bot.entity;
559                        let first_bot_state = first_bot_state.clone();
560
561                        task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone()));
562
563                        // this makes it not have to keep locking the ecs
564                        let mut states = HashMap::new();
565                        states.insert(first_bot_entity, first_bot_state);
566                        while let Ok((Some(event), bot)) = bots_rx.try_recv() {
567                            let state = match states.entry(bot.entity) {
568                                hash_map::Entry::Occupied(e) => e.into_mut(),
569                                hash_map::Entry::Vacant(e) => {
570                                    let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
571                                        error!(
572                                            "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
573                                            bot.username(),
574                                            bot.entity
575                                        );
576                                        continue;
577                                    };
578                                    let state = state.clone();
579                                    e.insert(state)
580                                }
581                            };
582                            task::spawn_local((handler)(bot, event, state.clone()));
583                        }
584                    }
585                }
586            });
587
588            let app_exit = appexit_rx
589                .await
590                .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
591
592            swarm_handler_task.abort();
593            client_handler_task.abort();
594
595            app_exit
596        }).await
597    }
598}
599
600impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
601    fn default() -> Self {
602        Self::new()
603    }
604}