azalea/swarm/builder.rs
1use std::{
2 collections::{HashMap, hash_map},
3 mem,
4 sync::{
5 Arc,
6 atomic::{self, AtomicBool},
7 },
8 time::Duration,
9};
10
11use azalea_client::{Account, DefaultPlugins, start_ecs_runner};
12use azalea_protocol::address::{ResolvableAddr, ResolvedAddr};
13use azalea_world::InstanceContainer;
14use bevy_app::{App, AppExit, Plugins, SubApp};
15use bevy_ecs::{component::Component, resource::Resource};
16use futures::future::join_all;
17use parking_lot::RwLock;
18use tokio::{sync::mpsc, task};
19use tracing::{debug, error, warn};
20
21use crate::{
22 BoxHandleFn, HandleFn, JoinOpts, NoState,
23 auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24 bot::DefaultBotPlugins,
25 swarm::{
26 BoxSwarmHandleFn, DefaultSwarmPlugins, NoSwarmState, Swarm, SwarmEvent, SwarmHandleFn,
27 },
28};
29
30/// Create a new [`Swarm`].
31///
32/// The generics of this struct stand for the following:
33/// - S: State
34/// - SS: Swarm State
35/// - R: Return type of the handler
36/// - SR: Return type of the swarm handler
37///
38/// You shouldn't have to manually set them though, they'll be inferred for you.
39pub struct SwarmBuilder<S, SS, R, SR>
40where
41 S: Send + Sync + Clone + Component + 'static,
42 SS: Default + Send + Sync + Clone + Resource + 'static,
43 Self: Send,
44{
45 // SubApp is used instead of App to make it Send
46 pub(crate) app: SubApp,
47 /// The accounts and proxies that are going to join the server.
48 pub(crate) accounts: Vec<(Account, JoinOpts)>,
49 /// The individual bot states.
50 ///
51 /// This must be the same length as `accounts`, since each bot gets one
52 /// state.
53 pub(crate) states: Vec<S>,
54 /// The state for the overall swarm.
55 pub(crate) swarm_state: SS,
56 /// The function that's called every time a bot receives an [`Event`].
57 pub(crate) handler: Option<BoxHandleFn<S, R>>,
58 /// The function that's called every time the swarm receives a
59 /// [`SwarmEvent`].
60 pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
61
62 /// How long we should wait between each bot joining the server.
63 ///
64 /// If this is None, every bot will connect at the same time. None is
65 /// different than a duration of 0, since if a duration is present the
66 /// bots will wait for the previous one to be ready.
67 pub(crate) join_delay: Option<Duration>,
68
69 /// The default reconnection delay for our bots.
70 ///
71 /// This will change the value of the [`AutoReconnectDelay`] resource.
72 pub(crate) reconnect_after: Option<Duration>,
73}
74impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
75 /// Start creating the swarm.
76 #[must_use]
77 pub fn new() -> Self {
78 Self::new_without_plugins()
79 .add_plugins(DefaultPlugins)
80 .add_plugins(DefaultBotPlugins)
81 .add_plugins(DefaultSwarmPlugins)
82 }
83
84 /// [`Self::new`] but without adding the plugins by default.
85 ///
86 /// This is useful if you want to disable a default plugin. This also exists
87 /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
88 ///
89 /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
90 /// [`DefaultSwarmPlugins`] to this.
91 ///
92 /// ```
93 /// # use azalea::{prelude::*, swarm::prelude::*};
94 /// use azalea::app::PluginGroup;
95 ///
96 /// let swarm_builder = SwarmBuilder::new_without_plugins()
97 /// .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>())
98 /// .add_plugins(azalea::bot::DefaultBotPlugins)
99 /// .add_plugins(azalea::swarm::DefaultSwarmPlugins);
100 /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
101 /// # #[derive(Clone, Component, Default, Resource)]
102 /// # pub struct State;
103 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
104 /// # Ok(())
105 /// # }
106 /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
107 /// # Ok(())
108 /// # }
109 /// ```
110 ///
111 /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
112 #[must_use]
113 pub fn new_without_plugins() -> Self {
114 SwarmBuilder {
115 // we create the app here so plugins can add onto it.
116 // the schedules won't run until [`Self::start`] is called.
117
118 // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
119 // initialized
120 app: mem::take(App::new().main_mut()),
121 accounts: Vec::new(),
122 states: Vec::new(),
123 swarm_state: NoSwarmState,
124 handler: None,
125 swarm_handler: None,
126 join_delay: None,
127 reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
128 }
129 }
130}
131
132impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
133where
134 SS: Default + Send + Sync + Clone + Resource + 'static,
135{
136 /// Set the function that's called every time a bot receives an
137 /// [`Event`](crate::Event). This is the intended way to handle
138 /// normal per-bot events.
139 ///
140 /// Currently you can have up to one handler.
141 ///
142 /// Note that if you're creating clients directly from the ECS using
143 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
144 /// the handler function won't be called for that client. This also applies
145 /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
146 /// most bots, though.
147 ///
148 /// ```
149 /// # use azalea::{prelude::*, swarm::prelude::*};
150 /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
151 /// swarm_builder.set_handler(handle);
152 ///
153 /// #[derive(Clone, Component, Default)]
154 /// struct State {}
155 /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
156 /// Ok(())
157 /// }
158 ///
159 /// # #[derive(Clone, Default, Resource)]
160 /// # struct SwarmState {}
161 /// # async fn swarm_handle(
162 /// # mut swarm: Swarm,
163 /// # event: SwarmEvent,
164 /// # state: SwarmState,
165 /// # ) -> anyhow::Result<()> {
166 /// # Ok(())
167 /// # }
168 /// ```
169 ///
170 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
171 #[must_use]
172 pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
173 where
174 Fut: Future<Output = R> + Send + 'static,
175 S: Send + Sync + Clone + Component + Default + 'static,
176 {
177 SwarmBuilder {
178 handler: Some(Box::new(move |bot, event, state: S| {
179 Box::pin(handler(bot, event, state))
180 })),
181 // if we added accounts before the State was set, we've gotta set it to the default now
182 states: vec![S::default(); self.accounts.len()],
183 app: self.app,
184 ..self
185 }
186 }
187}
188
189impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
190where
191 S: Send + Sync + Clone + Component + 'static,
192{
193 /// Set the function that's called every time the swarm receives a
194 /// [`SwarmEvent`]. This is the intended way to handle global swarm events.
195 ///
196 /// Currently you can have up to one swarm handler.
197 ///
198 /// Note that if you're creating clients directly from the ECS using
199 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
200 /// this handler function won't be called for that client. This also applies
201 /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
202 /// most bots, though.
203 ///
204 /// ```
205 /// # use azalea::{prelude::*, swarm::prelude::*};
206 /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
207 /// swarm_builder.set_swarm_handler(swarm_handle);
208 ///
209 /// # #[derive(Clone, Component, Default)]
210 /// # struct State {}
211 ///
212 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
213 /// # Ok(())
214 /// # }
215 ///
216 /// #[derive(Clone, Default, Resource)]
217 /// struct SwarmState {}
218 /// async fn swarm_handle(
219 /// mut swarm: Swarm,
220 /// event: SwarmEvent,
221 /// state: SwarmState,
222 /// ) -> anyhow::Result<()> {
223 /// Ok(())
224 /// }
225 /// ```
226 ///
227 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
228 #[must_use]
229 pub fn set_swarm_handler<SS, Fut, SR>(
230 self,
231 handler: SwarmHandleFn<SS, Fut>,
232 ) -> SwarmBuilder<S, SS, R, SR>
233 where
234 SS: Default + Send + Sync + Clone + Resource + 'static,
235 Fut: Future<Output = SR> + Send + 'static,
236 {
237 SwarmBuilder {
238 handler: self.handler,
239 app: self.app,
240 accounts: self.accounts,
241 states: self.states,
242 swarm_state: SS::default(),
243 swarm_handler: Some(Box::new(move |swarm, event, state| {
244 Box::pin(handler(swarm, event, state))
245 })),
246 join_delay: self.join_delay,
247 reconnect_after: self.reconnect_after,
248 }
249 }
250}
251
252impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
253where
254 S: Send + Sync + Clone + Component + 'static,
255 SS: Default + Send + Sync + Clone + Resource + 'static,
256 R: Send + 'static,
257 SR: Send + 'static,
258{
259 /// Add a vec of [`Account`]s to the swarm.
260 ///
261 /// Use [`Self::add_account`] to only add one account. If you want the
262 /// clients to have different default states, add them one at a time with
263 /// [`Self::add_account_with_state`].
264 ///
265 /// By default, every account will join at the same time, you can add a
266 /// delay with [`Self::join_delay`].
267 #[must_use]
268 pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
269 where
270 S: Default,
271 {
272 for account in accounts {
273 self = self.add_account(account);
274 }
275
276 self
277 }
278
279 /// Add a single new [`Account`] to the swarm.
280 ///
281 /// Use [`Self::add_accounts`] to add multiple accounts at a time.
282 ///
283 /// This will make the state for this client be the default, use
284 /// [`Self::add_account_with_state`] to avoid that.
285 #[must_use]
286 pub fn add_account(self, account: Account) -> Self
287 where
288 S: Default,
289 {
290 self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
291 }
292
293 /// Add an account with a custom initial state.
294 ///
295 /// Use just [`Self::add_account`] to use the `Default` implementation for
296 /// the state.
297 #[must_use]
298 pub fn add_account_with_state(self, account: Account, state: S) -> Self {
299 self.add_account_with_state_and_opts(account, state, JoinOpts::default())
300 }
301
302 /// Add an account with a custom initial state.
303 ///
304 /// Use just [`Self::add_account`] to use the `Default` implementation for
305 /// the state.
306 #[must_use]
307 pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
308 where
309 S: Default,
310 {
311 self.add_account_with_state_and_opts(account, S::default(), opts)
312 }
313
314 /// Same as [`Self::add_account_with_state`], but allow passing in custom
315 /// join options.
316 #[must_use]
317 pub fn add_account_with_state_and_opts(
318 mut self,
319 account: Account,
320 state: S,
321 join_opts: JoinOpts,
322 ) -> Self {
323 self.accounts.push((account, join_opts));
324 self.states.push(state);
325 self
326 }
327
328 /// Set the swarm state instead of initializing defaults.
329 #[must_use]
330 pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
331 self.swarm_state = swarm_state;
332 self
333 }
334
335 /// Add one or more plugins to this swarm.
336 ///
337 /// See [`Self::new_without_plugins`] to learn how to disable default
338 /// plugins.
339 #[must_use]
340 pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
341 self.app.add_plugins(plugins);
342 self
343 }
344
345 /// Set how long we should wait between each bot joining the server.
346 ///
347 /// By default, every bot will connect at the same time. If you set this
348 /// field, however, the bots will wait for the previous one to have
349 /// connected and *then* they'll wait the given duration.
350 #[must_use]
351 pub fn join_delay(mut self, delay: Duration) -> Self {
352 self.join_delay = Some(delay);
353 self
354 }
355
356 /// Configures the auto-reconnection behavior for our bots.
357 ///
358 /// If this is `Some`, then it'll set the default reconnection delay for our
359 /// bots (how long they'll wait after being kicked before they try
360 /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
361 ///
362 /// If this function isn't called, then our clients will reconnect after
363 /// [`DEFAULT_RECONNECT_DELAY`].
364 #[must_use]
365 pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
366 self.reconnect_after = delay.into();
367 self
368 }
369
370 /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
371 /// server.
372 ///
373 /// The `address` argument can be a `&str`, [`ServerAddr`],
374 /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`].
375 ///
376 /// [`ServerAddr`]: azalea_protocol::address::ServerAddr
377 pub async fn start(self, address: impl ResolvableAddr) -> AppExit {
378 self.start_with_opts(address, JoinOpts::default()).await
379 }
380
381 #[doc(hidden)]
382 #[deprecated = "renamed to `start_with_opts`."]
383 pub async fn start_with_default_opts(
384 self,
385 address: impl ResolvableAddr,
386 default_join_opts: JoinOpts,
387 ) -> AppExit {
388 self.start_with_opts(address, default_join_opts).await
389 }
390
391 /// Do the same as [`Self::start`], but allow passing in default join
392 /// options for the bots.
393 pub async fn start_with_opts(
394 mut self,
395 address: impl ResolvableAddr,
396 join_opts: JoinOpts,
397 ) -> AppExit {
398 assert_eq!(
399 self.accounts.len(),
400 self.states.len(),
401 "There must be exactly one state per bot."
402 );
403
404 debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
405
406 let address = if let Some(socket_addr) = join_opts.custom_socket_addr {
407 let server_addr = if let Some(server_addr) = join_opts
408 .custom_server_addr
409 .clone()
410 .or_else(|| address.clone().server_addr().ok())
411 {
412 server_addr
413 } else {
414 error!(
415 "Failed to parse address: {address:?}. If this was expected, consider passing in a `ServerAddr` instead."
416 );
417 return AppExit::error();
418 };
419
420 ResolvedAddr {
421 server: server_addr,
422 socket: socket_addr,
423 }
424 } else {
425 let Ok(addr) = address.clone().resolve().await else {
426 error!(
427 "Failed to resolve address: {address:?}. If this was expected, consider resolving the address earlier with `ResolvableAddr::resolve`."
428 );
429 return AppExit::error();
430 };
431 addr
432 };
433
434 let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
435
436 // we can't modify the swarm plugins after this
437 let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
438 let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
439
440 swarm_tx.send(SwarmEvent::Init).unwrap();
441
442 let main_schedule_label = self.app.update_schedule.unwrap();
443
444 let local_set = task::LocalSet::new();
445
446 local_set.run_until(async move {
447 // start_ecs_runner must be run inside of the LocalSet
448 let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
449
450 let swarm = Swarm {
451 ecs: ecs_lock.clone(),
452
453 address: Arc::new(RwLock::new(address)),
454 instance_container,
455
456 bots_tx,
457
458 swarm_tx: swarm_tx.clone(),
459 };
460
461 // run the main schedule so the startup systems run
462 {
463 let mut ecs = ecs_lock.write();
464 ecs.insert_resource(swarm.clone());
465 ecs.insert_resource(self.swarm_state.clone());
466 if let Some(reconnect_after) = self.reconnect_after {
467 ecs.insert_resource(AutoReconnectDelay {
468 delay: reconnect_after,
469 });
470 } else {
471 ecs.remove_resource::<AutoReconnectDelay>();
472 }
473 ecs.run_schedule(main_schedule_label);
474 ecs.clear_trackers();
475 }
476
477 // only do this after we inserted the Swarm and state resources to avoid errors
478 // where Res<Swarm> is inaccessible
479 start_running_systems();
480
481 // SwarmBuilder (self) isn't Send so we have to take all the things we need out
482 // of it
483 let swarm_clone = swarm.clone();
484 let join_delay = self.join_delay;
485 let accounts = self.accounts.clone();
486 let states = self.states.clone();
487
488 task::spawn_local(async move {
489 if let Some(join_delay) = join_delay {
490 // if there's a join delay, then join one by one
491 for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
492 let mut join_opts = join_opts.clone();
493 join_opts.update(bot_join_opts);
494 let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
495 tokio::time::sleep(join_delay).await;
496 }
497 } else {
498 // otherwise, join all at once
499 let swarm_borrow = &swarm_clone;
500 join_all(accounts.iter().zip(states).map(
501 |((account, bot_join_opts), state)| async {
502 let mut join_opts = join_opts.clone();
503 join_opts.update(bot_join_opts);
504 let _ = swarm_borrow
505 .clone()
506 .add_with_opts(account, state, &join_opts)
507 .await;
508 },
509 ))
510 .await;
511 }
512
513 swarm_tx.send(SwarmEvent::Login).unwrap();
514 });
515
516 let swarm_state = self.swarm_state;
517
518 // Watch swarm_rx and send those events to the swarm_handle.
519 let swarm_clone = swarm.clone();
520 let swarm_handler_task = task::spawn_local(async move {
521 while let Some(event) = swarm_rx.recv().await {
522 if let Some(swarm_handler) = &self.swarm_handler {
523 task::spawn_local((swarm_handler)(
524 swarm_clone.clone(),
525 event,
526 swarm_state.clone(),
527 ));
528 }
529 }
530
531 unreachable!(
532 "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
533 );
534 });
535
536 // bot events
537 let client_handler_task = task::spawn_local(async move {
538 while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
539 if bots_rx.len() > 1_000 {
540 static WARNED: AtomicBool = AtomicBool::new(false);
541 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
542 warn!("the Client Event channel has more than 1000 items!")
543 }
544 }
545
546 if let Some(handler) = &self.handler {
547 let ecs_mutex = first_bot.ecs.clone();
548 let mut ecs = ecs_mutex.write();
549 let mut query = ecs.query::<Option<&S>>();
550 let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
551 error!(
552 "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
553 first_bot.username(),
554 first_bot.entity
555 );
556 continue;
557 };
558 let first_bot_entity = first_bot.entity;
559 let first_bot_state = first_bot_state.clone();
560
561 task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone()));
562
563 // this makes it not have to keep locking the ecs
564 let mut states = HashMap::new();
565 states.insert(first_bot_entity, first_bot_state);
566 while let Ok((Some(event), bot)) = bots_rx.try_recv() {
567 let state = match states.entry(bot.entity) {
568 hash_map::Entry::Occupied(e) => e.into_mut(),
569 hash_map::Entry::Vacant(e) => {
570 let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
571 error!(
572 "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
573 bot.username(),
574 bot.entity
575 );
576 continue;
577 };
578 let state = state.clone();
579 e.insert(state)
580 }
581 };
582 task::spawn_local((handler)(bot, event, state.clone()));
583 }
584 }
585 }
586 });
587
588 let app_exit = appexit_rx
589 .await
590 .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
591
592 swarm_handler_task.abort();
593 client_handler_task.abort();
594
595 app_exit
596 }).await
597 }
598}
599
600impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
601 fn default() -> Self {
602 Self::new()
603 }
604}