azalea/swarm/builder.rs
1use std::{
2 collections::{HashMap, hash_map},
3 mem,
4 sync::{
5 Arc,
6 atomic::{self, AtomicBool},
7 },
8 time::Duration,
9};
10
11use azalea_client::{DefaultPlugins, account::Account, start_ecs_runner};
12use azalea_protocol::address::{ResolvableAddr, ResolvedAddr};
13use azalea_world::Worlds;
14use bevy_app::{App, AppExit, Plugins, SubApp};
15use bevy_ecs::{component::Component, resource::Resource};
16use futures::future::join_all;
17use parking_lot::RwLock;
18use tokio::{sync::mpsc, task};
19use tracing::{debug, error, warn};
20
21use crate::{
22 BoxHandleFn, HandleFn, JoinOpts, NoState,
23 auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24 bot::DefaultBotPlugins,
25 swarm::{
26 BoxSwarmHandleFn, DefaultSwarmPlugins, NoSwarmState, Swarm, SwarmEvent, SwarmHandleFn,
27 },
28};
29
30/// Create a new [`Swarm`].
31///
32/// The generics of this struct stand for the following:
33/// - S: State
34/// - SS: Swarm State
35/// - R: Return type of the handler
36/// - SR: Return type of the swarm handler
37///
38/// You shouldn't have to manually set them though, they'll be inferred for you.
39pub struct SwarmBuilder<S, SS, R, SR>
40where
41 S: Send + Sync + Clone + Component + 'static,
42 SS: Default + Send + Sync + Clone + Resource + 'static,
43 Self: Send,
44{
45 // SubApp is used instead of App to make it Send
46 pub(crate) app: SubApp,
47 /// The accounts and proxies that are going to join the server.
48 pub(crate) accounts: Vec<(Account, JoinOpts)>,
49 /// The individual bot states.
50 ///
51 /// This must be the same length as `accounts`, since each bot gets one
52 /// state.
53 pub(crate) states: Vec<S>,
54 /// The state for the overall swarm.
55 pub(crate) swarm_state: SS,
56 /// The function that's called every time a bot receives an [`Event`].
57 pub(crate) handler: Option<BoxHandleFn<S, R>>,
58 /// The function that's called every time the swarm receives a
59 /// [`SwarmEvent`].
60 pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
61
62 /// How long we should wait between each bot joining the server.
63 ///
64 /// If this is None, every bot will connect at the same time. None is
65 /// different than a duration of 0, since if a duration is present the
66 /// bots will wait for the previous one to be ready.
67 pub(crate) join_delay: Option<Duration>,
68
69 /// The default reconnection delay for our bots.
70 ///
71 /// This will change the value of the [`AutoReconnectDelay`] resource.
72 pub(crate) reconnect_after: Option<Duration>,
73}
74impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
75 /// Start creating the swarm.
76 #[must_use]
77 pub fn new() -> Self {
78 Self::new_without_plugins().add_plugins((
79 DefaultPlugins,
80 DefaultBotPlugins,
81 DefaultSwarmPlugins,
82 ))
83 }
84
85 /// [`Self::new`] but without adding the plugins by default.
86 ///
87 /// This is useful if you want to disable a default plugin. This also exists
88 /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
89 ///
90 /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
91 /// [`DefaultSwarmPlugins`] to this.
92 ///
93 /// ```
94 /// # use azalea::{prelude::*, swarm::{prelude::*, DefaultSwarmPlugins}, bot::DefaultBotPlugins};
95 /// use azalea::app::PluginGroup;
96 ///
97 /// let swarm_builder = SwarmBuilder::new_without_plugins()
98 /// .add_plugins((
99 /// DefaultBotPlugins,
100 /// DefaultSwarmPlugins,
101 /// azalea::DefaultPlugins
102 /// .build()
103 /// .disable::<azalea::chat_signing::ChatSigningPlugin>(),
104 /// ));
105 /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
106 /// # #[derive(Clone, Component, Default, Resource)]
107 /// # pub struct State;
108 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
109 /// # Ok(())
110 /// # }
111 /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
112 /// # Ok(())
113 /// # }
114 /// ```
115 ///
116 /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
117 #[must_use]
118 pub fn new_without_plugins() -> Self {
119 SwarmBuilder {
120 // we create the app here so plugins can add onto it.
121 // the schedules won't run until [`Self::start`] is called.
122
123 // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
124 // initialized
125 app: mem::take(App::new().main_mut()),
126 accounts: Vec::new(),
127 states: Vec::new(),
128 swarm_state: NoSwarmState,
129 handler: None,
130 swarm_handler: None,
131 join_delay: None,
132 reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
133 }
134 }
135}
136
137impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
138where
139 SS: Default + Send + Sync + Clone + Resource + 'static,
140{
141 /// Set the function that's called every time a bot receives an
142 /// [`Event`](crate::Event). This is the intended way to handle
143 /// normal per-bot events.
144 ///
145 /// Currently you can have up to one handler.
146 ///
147 /// Note that if you're creating clients directly from the ECS using
148 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
149 /// the handler function won't be called for that client. This also applies
150 /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
151 /// most bots, though.
152 ///
153 /// ```
154 /// # use azalea::{prelude::*, swarm::prelude::*};
155 /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
156 /// swarm_builder.set_handler(handle);
157 ///
158 /// #[derive(Clone, Component, Default)]
159 /// struct State {}
160 /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
161 /// Ok(())
162 /// }
163 ///
164 /// # #[derive(Clone, Default, Resource)]
165 /// # struct SwarmState {}
166 /// # async fn swarm_handle(
167 /// # mut swarm: Swarm,
168 /// # event: SwarmEvent,
169 /// # state: SwarmState,
170 /// # ) -> anyhow::Result<()> {
171 /// # Ok(())
172 /// # }
173 /// ```
174 ///
175 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
176 #[must_use]
177 pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
178 where
179 Fut: Future<Output = R> + Send + 'static,
180 S: Send + Sync + Clone + Component + Default + 'static,
181 {
182 SwarmBuilder {
183 handler: Some(Box::new(move |bot, event, state: S| {
184 Box::pin(handler(bot, event, state))
185 })),
186 // if we added accounts before the State was set, we've gotta set it to the default now
187 states: vec![S::default(); self.accounts.len()],
188 app: self.app,
189 ..self
190 }
191 }
192}
193
194impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
195where
196 S: Send + Sync + Clone + Component + 'static,
197{
198 /// Set the function that's called every time the swarm receives a
199 /// [`SwarmEvent`]. This is the intended way to handle global swarm events.
200 ///
201 /// Currently you can have up to one swarm handler.
202 ///
203 /// Note that if you're creating clients directly from the ECS using
204 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
205 /// this handler function won't be called for that client. This also applies
206 /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
207 /// most bots, though.
208 ///
209 /// ```
210 /// # use azalea::{prelude::*, swarm::prelude::*};
211 /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
212 /// swarm_builder.set_swarm_handler(swarm_handle);
213 ///
214 /// # #[derive(Clone, Component, Default)]
215 /// # struct State {}
216 ///
217 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
218 /// # Ok(())
219 /// # }
220 ///
221 /// #[derive(Clone, Default, Resource)]
222 /// struct SwarmState {}
223 /// async fn swarm_handle(
224 /// mut swarm: Swarm,
225 /// event: SwarmEvent,
226 /// state: SwarmState,
227 /// ) -> anyhow::Result<()> {
228 /// Ok(())
229 /// }
230 /// ```
231 ///
232 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
233 #[must_use]
234 pub fn set_swarm_handler<SS, Fut, SR>(
235 self,
236 handler: SwarmHandleFn<SS, Fut>,
237 ) -> SwarmBuilder<S, SS, R, SR>
238 where
239 SS: Default + Send + Sync + Clone + Resource + 'static,
240 Fut: Future<Output = SR> + Send + 'static,
241 {
242 SwarmBuilder {
243 handler: self.handler,
244 app: self.app,
245 accounts: self.accounts,
246 states: self.states,
247 swarm_state: SS::default(),
248 swarm_handler: Some(Box::new(move |swarm, event, state| {
249 Box::pin(handler(swarm, event, state))
250 })),
251 join_delay: self.join_delay,
252 reconnect_after: self.reconnect_after,
253 }
254 }
255}
256
257impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
258where
259 S: Send + Sync + Clone + Component + 'static,
260 SS: Default + Send + Sync + Clone + Resource + 'static,
261 R: Send + 'static,
262 SR: Send + 'static,
263{
264 /// Add a vec of [`Account`]s to the swarm.
265 ///
266 /// Use [`Self::add_account`] to only add one account. If you want the
267 /// clients to have different default states, add them one at a time with
268 /// [`Self::add_account_with_state`].
269 ///
270 /// By default, every account will join at the same time, you can add a
271 /// delay with [`Self::join_delay`].
272 #[must_use]
273 pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
274 where
275 S: Default,
276 {
277 for account in accounts {
278 self = self.add_account(account);
279 }
280
281 self
282 }
283
284 /// Add a single new [`Account`] to the swarm.
285 ///
286 /// Use [`Self::add_accounts`] to add multiple accounts at a time.
287 ///
288 /// This will make the state for this client be the default, use
289 /// [`Self::add_account_with_state`] to avoid that.
290 #[must_use]
291 pub fn add_account(self, account: Account) -> Self
292 where
293 S: Default,
294 {
295 self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
296 }
297
298 /// Add an account with a custom initial state.
299 ///
300 /// Use just [`Self::add_account`] to use the `Default` implementation for
301 /// the state.
302 #[must_use]
303 pub fn add_account_with_state(self, account: Account, state: S) -> Self {
304 self.add_account_with_state_and_opts(account, state, JoinOpts::default())
305 }
306
307 /// Add an account with a custom initial state.
308 ///
309 /// Use just [`Self::add_account`] to use the `Default` implementation for
310 /// the state.
311 #[must_use]
312 pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
313 where
314 S: Default,
315 {
316 self.add_account_with_state_and_opts(account, S::default(), opts)
317 }
318
319 /// Same as [`Self::add_account_with_state`], but allow passing in custom
320 /// join options.
321 #[must_use]
322 pub fn add_account_with_state_and_opts(
323 mut self,
324 account: Account,
325 state: S,
326 join_opts: JoinOpts,
327 ) -> Self {
328 self.accounts.push((account, join_opts));
329 self.states.push(state);
330 self
331 }
332
333 /// Set the swarm state instead of initializing defaults.
334 #[must_use]
335 pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
336 self.swarm_state = swarm_state;
337 self
338 }
339
340 /// Add one or more plugins to this swarm.
341 ///
342 /// See [`Self::new_without_plugins`] to learn how to disable default
343 /// plugins.
344 #[must_use]
345 pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
346 self.app.add_plugins(plugins);
347 self
348 }
349
350 /// Set how long we should wait between each bot joining the server.
351 ///
352 /// By default, every bot will connect at the same time. If you set this
353 /// field, however, the bots will wait for the previous one to have
354 /// connected and *then* they'll wait the given duration.
355 #[must_use]
356 pub fn join_delay(mut self, delay: Duration) -> Self {
357 self.join_delay = Some(delay);
358 self
359 }
360
361 /// Configures the auto-reconnection behavior for our bots.
362 ///
363 /// If this is `Some`, then it'll set the default reconnection delay for our
364 /// bots (how long they'll wait after being kicked before they try
365 /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
366 ///
367 /// If this function isn't called, then our clients will reconnect after
368 /// [`DEFAULT_RECONNECT_DELAY`].
369 #[must_use]
370 pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
371 self.reconnect_after = delay.into();
372 self
373 }
374
375 /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
376 /// server.
377 ///
378 /// The `address` argument can be a `&str`, [`ServerAddr`],
379 /// [`ResolvedAddr`], or anything else that implements [`ResolvableAddr`].
380 ///
381 /// [`ServerAddr`]: ../../azalea_protocol/address/struct.ServerAddr.html
382 /// [`ResolvedAddr`]: ../../azalea_protocol/address/struct.ResolvedAddr.html
383 /// [`ResolvableAddr`]: ../../azalea_protocol/address/trait.ResolvableAddr.html
384 pub async fn start(self, address: impl ResolvableAddr) -> AppExit {
385 self.start_with_opts(address, JoinOpts::default()).await
386 }
387
388 #[doc(hidden)]
389 #[deprecated = "renamed to `start_with_opts`."]
390 pub async fn start_with_default_opts(
391 self,
392 address: impl ResolvableAddr,
393 default_join_opts: JoinOpts,
394 ) -> AppExit {
395 self.start_with_opts(address, default_join_opts).await
396 }
397
398 /// Do the same as [`Self::start`], but allow passing in default join
399 /// options for the bots.
400 pub async fn start_with_opts(
401 mut self,
402 address: impl ResolvableAddr,
403 join_opts: JoinOpts,
404 ) -> AppExit {
405 assert_eq!(
406 self.accounts.len(),
407 self.states.len(),
408 "There must be exactly one state per bot."
409 );
410
411 debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
412
413 let address = if let Some(socket_addr) = join_opts.custom_socket_addr {
414 let server_addr = if let Some(server_addr) = join_opts
415 .custom_server_addr
416 .clone()
417 .or_else(|| address.clone().server_addr().ok())
418 {
419 server_addr
420 } else {
421 error!(
422 "Failed to parse address: {address:?}. If this was expected, consider passing in a `ServerAddr` instead."
423 );
424 return AppExit::error();
425 };
426
427 ResolvedAddr {
428 server: server_addr,
429 socket: socket_addr,
430 }
431 } else {
432 let Ok(addr) = address.clone().resolve().await else {
433 error!(
434 "Failed to resolve address: {address:?}. If this was expected, consider resolving the address earlier with `ResolvableAddr::resolve`."
435 );
436 return AppExit::error();
437 };
438 addr
439 };
440
441 let worlds = Arc::new(RwLock::new(Worlds::default()));
442
443 // we can't modify the swarm plugins after this
444 let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
445 let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
446
447 swarm_tx.send(SwarmEvent::Init).unwrap();
448
449 let main_schedule_label = self.app.update_schedule.unwrap();
450
451 let local_set = task::LocalSet::new();
452
453 local_set.run_until(async move {
454 // start_ecs_runner must be run inside of the LocalSet
455 let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
456
457 let swarm = Swarm {
458 ecs: ecs_lock.clone(),
459
460 address: Arc::new(RwLock::new(address)),
461 worlds,
462
463 bots_tx,
464
465 swarm_tx: swarm_tx.clone(),
466 };
467
468 // run the main schedule so the startup systems run
469 {
470 let mut ecs = ecs_lock.write();
471 ecs.insert_resource(swarm.clone());
472 ecs.insert_resource(self.swarm_state.clone());
473 if let Some(reconnect_after) = self.reconnect_after {
474 ecs.insert_resource(AutoReconnectDelay {
475 delay: reconnect_after,
476 });
477 } else {
478 ecs.remove_resource::<AutoReconnectDelay>();
479 }
480 ecs.run_schedule(main_schedule_label);
481 ecs.clear_trackers();
482 }
483
484 // only do this after we inserted the Swarm and state resources to avoid errors
485 // where Res<Swarm> is inaccessible
486 start_running_systems();
487
488 // SwarmBuilder (self) isn't Send so we have to take all the things we need out
489 // of it
490 let swarm_clone = swarm.clone();
491 let join_delay = self.join_delay;
492 let accounts = self.accounts.clone();
493 let states = self.states.clone();
494
495 task::spawn_local(async move {
496 if let Some(join_delay) = join_delay {
497 // if there's a join delay, then join one by one
498 for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
499 let mut join_opts = join_opts.clone();
500 join_opts.update(bot_join_opts);
501 let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
502 tokio::time::sleep(join_delay).await;
503 }
504 } else {
505 // otherwise, join all at once
506 let swarm_borrow = &swarm_clone;
507 join_all(accounts.iter().zip(states).map(
508 |((account, bot_join_opts), state)| async {
509 let mut join_opts = join_opts.clone();
510 join_opts.update(bot_join_opts);
511 let _ = swarm_borrow
512 .clone()
513 .add_with_opts(account, state, &join_opts)
514 .await;
515 },
516 ))
517 .await;
518 }
519
520 swarm_tx.send(SwarmEvent::Login).unwrap();
521 });
522
523 let swarm_state = self.swarm_state;
524
525 // Watch swarm_rx and send those events to the swarm_handle.
526 let swarm_clone = swarm.clone();
527 let swarm_handler_task = task::spawn_local(async move {
528 while let Some(event) = swarm_rx.recv().await {
529 if let Some(swarm_handler) = &self.swarm_handler {
530 task::spawn_local((swarm_handler)(
531 swarm_clone.clone(),
532 event,
533 swarm_state.clone(),
534 ));
535 }
536 }
537
538 unreachable!(
539 "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
540 );
541 });
542
543 // bot events
544 let client_handler_task = task::spawn_local(async move {
545 while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
546 if bots_rx.len() > 1_000 {
547 static WARNED: AtomicBool = AtomicBool::new(false);
548 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
549 warn!("the Client Event channel has more than 1000 items!")
550 }
551 }
552
553 if let Some(handler) = &self.handler {
554 let ecs_mutex = first_bot.ecs.clone();
555 let mut ecs = ecs_mutex.write();
556 let mut query = ecs.query::<Option<&S>>();
557 let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
558 error!(
559 "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
560 first_bot.username(),
561 first_bot.entity
562 );
563 continue;
564 };
565 let first_bot_entity = first_bot.entity;
566 let first_bot_state = first_bot_state.clone();
567
568 task::spawn_local((handler)(first_bot, first_event, first_bot_state.clone()));
569
570 // this makes it not have to keep locking the ecs
571 let mut states = HashMap::new();
572 states.insert(first_bot_entity, first_bot_state);
573 while let Ok((Some(event), bot)) = bots_rx.try_recv() {
574 let state = match states.entry(bot.entity) {
575 hash_map::Entry::Occupied(e) => e.into_mut(),
576 hash_map::Entry::Vacant(e) => {
577 let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
578 error!(
579 "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
580 bot.username(),
581 bot.entity
582 );
583 continue;
584 };
585 let state = state.clone();
586 e.insert(state)
587 }
588 };
589 task::spawn_local((handler)(bot, event, state.clone()));
590 }
591 }
592 }
593 });
594
595 let app_exit = appexit_rx
596 .await
597 .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
598
599 swarm_handler_task.abort();
600 client_handler_task.abort();
601
602 app_exit
603 }).await
604 }
605}
606
607impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
608 fn default() -> Self {
609 Self::new()
610 }
611}