azalea/swarm/mod.rs
1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10 collections::{HashMap, hash_map},
11 future::Future,
12 mem,
13 net::SocketAddr,
14 sync::{
15 Arc,
16 atomic::{self, AtomicBool},
17 },
18 time::Duration,
19};
20
21use azalea_client::{
22 Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts,
23 auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24 chat::ChatPacket,
25 join::ConnectOpts,
26 start_ecs_runner,
27};
28use azalea_entity::LocalEntity;
29use azalea_protocol::{ServerAddress, resolver};
30use azalea_world::InstanceContainer;
31use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
32use bevy_ecs::prelude::*;
33use futures::future::{BoxFuture, join_all};
34use parking_lot::{Mutex, RwLock};
35use tokio::{sync::mpsc, time::sleep};
36use tracing::{debug, error, warn};
37
38use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
39
40/// A swarm is a way to conveniently control many bots at once, while also
41/// being able to control bots at an individual level when desired.
42///
43/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
44///
45/// Swarms are created from [`SwarmBuilder`].
46///
47/// Clients can be added to the swarm later via [`Swarm::add`], and can be
48/// removed with [`Client::disconnect`].
49#[derive(Clone, Resource)]
50pub struct Swarm {
51 pub ecs_lock: Arc<Mutex<World>>,
52
53 // the address is public and mutable so plugins can change it
54 pub resolved_address: Arc<RwLock<SocketAddr>>,
55 pub address: Arc<RwLock<ServerAddress>>,
56
57 pub instance_container: Arc<RwLock<InstanceContainer>>,
58
59 /// This is used internally to make the client handler function work.
60 bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
61 /// This is used internally to make the swarm handler function work.
62 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
63}
64
65/// Create a new [`Swarm`].
66///
67/// The generics of this struct stand for the following:
68/// - S: State
69/// - SS: Swarm State
70/// - R: Return type of the handler
71/// - SR: Return type of the swarm handler
72///
73/// You shouldn't have to manually set them though, they'll be inferred for you.
74pub struct SwarmBuilder<S, SS, R, SR>
75where
76 S: Send + Sync + Clone + Component + 'static,
77 SS: Default + Send + Sync + Clone + Resource + 'static,
78 Self: Send,
79{
80 // SubApp is used instead of App to make it Send
81 pub(crate) app: SubApp,
82 /// The accounts and proxies that are going to join the server.
83 pub(crate) accounts: Vec<(Account, JoinOpts)>,
84 /// The individual bot states. This must be the same length as `accounts`,
85 /// since each bot gets one state.
86 pub(crate) states: Vec<S>,
87 /// The state for the overall swarm.
88 pub(crate) swarm_state: SS,
89 /// The function that's called every time a bot receives an [`Event`].
90 pub(crate) handler: Option<BoxHandleFn<S, R>>,
91 /// The function that's called every time the swarm receives a
92 /// [`SwarmEvent`].
93 pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
94
95 /// How long we should wait between each bot joining the server. Set to
96 /// None to have every bot connect at the same time. None is different than
97 /// a duration of 0, since if a duration is present the bots will wait for
98 /// the previous one to be ready.
99 pub(crate) join_delay: Option<Duration>,
100
101 /// The default reconnection delay for our bots. This will change the value
102 /// of the `AutoReconnectDelay` resource.
103 pub(crate) reconnect_after: Option<Duration>,
104}
105impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
106 /// Start creating the swarm.
107 #[must_use]
108 pub fn new() -> Self {
109 Self::new_without_plugins()
110 .add_plugins(DefaultPlugins)
111 .add_plugins(DefaultBotPlugins)
112 .add_plugins(DefaultSwarmPlugins)
113 }
114
115 /// [`Self::new`] but without adding the plugins by default.
116 ///
117 /// This is useful if you want to disable a default plugin. This also exists
118 /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
119 ///
120 /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
121 /// [`DefaultSwarmPlugins`] to this.
122 ///
123 /// ```
124 /// # use azalea::{prelude::*, swarm::prelude::*};
125 /// use azalea::app::PluginGroup;
126 ///
127 /// let swarm_builder = SwarmBuilder::new_without_plugins()
128 /// .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>())
129 /// .add_plugins(azalea::DefaultBotPlugins)
130 /// .add_plugins(azalea::swarm::DefaultSwarmPlugins);
131 /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
132 /// # #[derive(Component, Resource, Clone, Default)]
133 /// # pub struct State;
134 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
135 /// # Ok(())
136 /// # }
137 /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
138 /// # Ok(())
139 /// # }
140 /// ```
141 ///
142 /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
143 #[must_use]
144 pub fn new_without_plugins() -> Self {
145 SwarmBuilder {
146 // we create the app here so plugins can add onto it.
147 // the schedules won't run until [`Self::start`] is called.
148
149 // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
150 // initialized
151 app: mem::take(App::new().main_mut()),
152 accounts: Vec::new(),
153 states: Vec::new(),
154 swarm_state: NoSwarmState,
155 handler: None,
156 swarm_handler: None,
157 join_delay: None,
158 reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
159 }
160 }
161}
162
163impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
164where
165 SS: Default + Send + Sync + Clone + Resource + 'static,
166{
167 /// Set the function that's called every time a bot receives an
168 /// [`enum@Event`]. This is the way to handle normal per-bot events.
169 ///
170 /// Currently you can have up to one handler.
171 ///
172 /// Note that if you're creating clients directly from the ECS using
173 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
174 /// the handler function won't be called for that client. This also applies
175 /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
176 /// most bots, though.
177 ///
178 /// ```
179 /// # use azalea::{prelude::*, swarm::prelude::*};
180 /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
181 /// swarm_builder.set_handler(handle);
182 ///
183 /// #[derive(Component, Default, Clone)]
184 /// struct State {}
185 /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
186 /// Ok(())
187 /// }
188 ///
189 /// # #[derive(Resource, Default, Clone)]
190 /// # struct SwarmState {}
191 /// # async fn swarm_handle(
192 /// # mut swarm: Swarm,
193 /// # event: SwarmEvent,
194 /// # state: SwarmState,
195 /// # ) -> anyhow::Result<()> {
196 /// # Ok(())
197 /// # }
198 /// ```
199 ///
200 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
201 #[must_use]
202 pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
203 where
204 Fut: Future<Output = R> + Send + 'static,
205 S: Send + Sync + Clone + Component + Default + 'static,
206 {
207 SwarmBuilder {
208 handler: Some(Box::new(move |bot, event, state: S| {
209 Box::pin(handler(bot, event, state))
210 })),
211 // if we added accounts before the State was set, we've gotta set it to the default now
212 states: vec![S::default(); self.accounts.len()],
213 app: self.app,
214 ..self
215 }
216 }
217}
218
219impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
220where
221 S: Send + Sync + Clone + Component + 'static,
222{
223 /// Set the function that's called every time the swarm receives a
224 /// [`SwarmEvent`]. This is the way to handle global swarm events.
225 ///
226 /// Currently you can have up to one swarm handler.
227 ///
228 /// Note that if you're creating clients directly from the ECS using
229 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
230 /// this handler function won't be called for that client. This also applies
231 /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
232 /// most bots, though.
233 ///
234 /// ```
235 /// # use azalea::{prelude::*, swarm::prelude::*};
236 /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
237 /// swarm_builder.set_swarm_handler(swarm_handle);
238 ///
239 /// # #[derive(Component, Default, Clone)]
240 /// # struct State {}
241 ///
242 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
243 /// # Ok(())
244 /// # }
245 ///
246 /// #[derive(Resource, Default, Clone)]
247 /// struct SwarmState {}
248 /// async fn swarm_handle(
249 /// mut swarm: Swarm,
250 /// event: SwarmEvent,
251 /// state: SwarmState,
252 /// ) -> anyhow::Result<()> {
253 /// Ok(())
254 /// }
255 /// ```
256 ///
257 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
258 #[must_use]
259 pub fn set_swarm_handler<SS, Fut, SR>(
260 self,
261 handler: SwarmHandleFn<SS, Fut>,
262 ) -> SwarmBuilder<S, SS, R, SR>
263 where
264 SS: Default + Send + Sync + Clone + Resource + 'static,
265 Fut: Future<Output = SR> + Send + 'static,
266 {
267 SwarmBuilder {
268 handler: self.handler,
269 app: self.app,
270 accounts: self.accounts,
271 states: self.states,
272 swarm_state: SS::default(),
273 swarm_handler: Some(Box::new(move |swarm, event, state| {
274 Box::pin(handler(swarm, event, state))
275 })),
276 join_delay: self.join_delay,
277 reconnect_after: self.reconnect_after,
278 }
279 }
280}
281
282impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
283where
284 S: Send + Sync + Clone + Component + 'static,
285 SS: Default + Send + Sync + Clone + Resource + 'static,
286 R: Send + 'static,
287 SR: Send + 'static,
288{
289 /// Add a vec of [`Account`]s to the swarm.
290 ///
291 /// Use [`Self::add_account`] to only add one account. If you want the
292 /// clients to have different default states, add them one at a time with
293 /// [`Self::add_account_with_state`].
294 ///
295 /// By default, every account will join at the same time, you can add a
296 /// delay with [`Self::join_delay`].
297 #[must_use]
298 pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
299 where
300 S: Default,
301 {
302 for account in accounts {
303 self = self.add_account(account);
304 }
305
306 self
307 }
308
309 /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to
310 /// add multiple accounts at a time.
311 ///
312 /// This will make the state for this client be the default, use
313 /// [`Self::add_account_with_state`] to avoid that.
314 #[must_use]
315 pub fn add_account(self, account: Account) -> Self
316 where
317 S: Default,
318 {
319 self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
320 }
321
322 /// Add an account with a custom initial state. Use just
323 /// [`Self::add_account`] to use the Default implementation for the state.
324 #[must_use]
325 pub fn add_account_with_state(self, account: Account, state: S) -> Self {
326 self.add_account_with_state_and_opts(account, state, JoinOpts::default())
327 }
328
329 /// Add an account with a custom initial state. Use just
330 /// [`Self::add_account`] to use the Default implementation for the state.
331 #[must_use]
332 pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
333 where
334 S: Default,
335 {
336 self.add_account_with_state_and_opts(account, S::default(), opts)
337 }
338
339 /// Same as [`Self::add_account_with_state`], but allow passing in custom
340 /// join options.
341 #[must_use]
342 pub fn add_account_with_state_and_opts(
343 mut self,
344 account: Account,
345 state: S,
346 join_opts: JoinOpts,
347 ) -> Self {
348 self.accounts.push((account, join_opts));
349 self.states.push(state);
350 self
351 }
352
353 /// Set the swarm state instead of initializing defaults.
354 #[must_use]
355 pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
356 self.swarm_state = swarm_state;
357 self
358 }
359
360 /// Add one or more plugins to this swarm.
361 ///
362 /// See [`Self::new_without_plugins`] to learn how to disable default
363 /// plugins.
364 #[must_use]
365 pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
366 self.app.add_plugins(plugins);
367 self
368 }
369
370 /// Set how long we should wait between each bot joining the server.
371 ///
372 /// By default, every bot will connect at the same time. If you set this
373 /// field, however, the bots will wait for the previous one to have
374 /// connected and *then* they'll wait the given duration.
375 #[must_use]
376 pub fn join_delay(mut self, delay: Duration) -> Self {
377 self.join_delay = Some(delay);
378 self
379 }
380
381 /// Configures the auto-reconnection behavior for our bots.
382 ///
383 /// If this is `Some`, then it'll set the default reconnection delay for our
384 /// bots (how long they'll wait after being kicked before they try
385 /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
386 ///
387 /// If this function isn't called, then our clients will reconnect after
388 /// [`DEFAULT_RECONNECT_DELAY`].
389 #[must_use]
390 pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
391 self.reconnect_after = delay.into();
392 self
393 }
394
395 /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
396 /// server.
397 ///
398 /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
399 /// that implements `TryInto<ServerAddress>`.
400 ///
401 /// [`ServerAddress`]: azalea_protocol::ServerAddress
402 pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<AppExit, StartError> {
403 // convert the TryInto<ServerAddress> into a ServerAddress
404 let address: ServerAddress = match address.try_into() {
405 Ok(address) => address,
406 Err(_) => return Err(StartError::InvalidAddress),
407 };
408
409 self.start_with_default_opts(address, JoinOpts::default())
410 .await
411 }
412
413 /// Do the same as [`Self::start`], but allow passing in default join
414 /// options for the bots.
415 pub async fn start_with_default_opts(
416 mut self,
417 address: impl TryInto<ServerAddress>,
418 default_join_opts: JoinOpts,
419 ) -> Result<AppExit, StartError> {
420 assert_eq!(
421 self.accounts.len(),
422 self.states.len(),
423 "There must be exactly one state per bot."
424 );
425
426 debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
427
428 // convert the TryInto<ServerAddress> into a ServerAddress
429 let address = match address.try_into() {
430 Ok(address) => address,
431 Err(_) => return Err(StartError::InvalidAddress),
432 };
433
434 let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
435 let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
436 a
437 } else {
438 resolver::resolve_address(&address).await?
439 };
440
441 let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
442
443 // we can't modify the swarm plugins after this
444 let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
445 let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
446
447 swarm_tx.send(SwarmEvent::Init).unwrap();
448
449 let main_schedule_label = self.app.update_schedule.unwrap();
450
451 let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
452
453 let swarm = Swarm {
454 ecs_lock: ecs_lock.clone(),
455
456 resolved_address: Arc::new(RwLock::new(resolved_address)),
457 address: Arc::new(RwLock::new(address)),
458 instance_container,
459
460 bots_tx,
461
462 swarm_tx: swarm_tx.clone(),
463 };
464
465 // run the main schedule so the startup systems run
466 {
467 let mut ecs = ecs_lock.lock();
468 ecs.insert_resource(swarm.clone());
469 ecs.insert_resource(self.swarm_state.clone());
470 if let Some(reconnect_after) = self.reconnect_after {
471 ecs.insert_resource(AutoReconnectDelay {
472 delay: reconnect_after,
473 });
474 } else {
475 ecs.remove_resource::<AutoReconnectDelay>();
476 }
477 ecs.run_schedule(main_schedule_label);
478 ecs.clear_trackers();
479 }
480
481 // only do this after we inserted the Swarm and state resources to avoid errors
482 // where Res<Swarm> is inaccessible
483 start_running_systems();
484
485 // SwarmBuilder (self) isn't Send so we have to take all the things we need out
486 // of it
487 let swarm_clone = swarm.clone();
488 let join_delay = self.join_delay;
489 let accounts = self.accounts.clone();
490 let states = self.states.clone();
491
492 tokio::spawn(async move {
493 if let Some(join_delay) = join_delay {
494 // if there's a join delay, then join one by one
495 for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
496 let mut join_opts = default_join_opts.clone();
497 join_opts.update(bot_join_opts);
498 let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
499 tokio::time::sleep(join_delay).await;
500 }
501 } else {
502 // otherwise, join all at once
503 let swarm_borrow = &swarm_clone;
504 join_all(accounts.iter().zip(states).map(
505 |((account, bot_join_opts), state)| async {
506 let mut join_opts = default_join_opts.clone();
507 join_opts.update(bot_join_opts);
508 let _ = swarm_borrow
509 .clone()
510 .add_with_opts(account, state, &join_opts)
511 .await;
512 },
513 ))
514 .await;
515 }
516
517 swarm_tx.send(SwarmEvent::Login).unwrap();
518 });
519
520 let swarm_state = self.swarm_state;
521
522 // Watch swarm_rx and send those events to the swarm_handle.
523 let swarm_clone = swarm.clone();
524 let swarm_handler_task = tokio::spawn(async move {
525 while let Some(event) = swarm_rx.recv().await {
526 if let Some(swarm_handler) = &self.swarm_handler {
527 tokio::spawn((swarm_handler)(
528 swarm_clone.clone(),
529 event,
530 swarm_state.clone(),
531 ));
532 }
533 }
534
535 unreachable!(
536 "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
537 );
538 });
539
540 // bot events
541 let client_handler_task = tokio::spawn(async move {
542 while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
543 if bots_rx.len() > 1_000 {
544 static WARNED: AtomicBool = AtomicBool::new(false);
545 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
546 warn!("the Client Event channel has more than 1000 items!")
547 }
548 }
549
550 if let Some(handler) = &self.handler {
551 let ecs_mutex = first_bot.ecs.clone();
552 let mut ecs = ecs_mutex.lock();
553 let mut query = ecs.query::<Option<&S>>();
554 let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
555 error!(
556 "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
557 first_bot.username(),
558 first_bot.entity
559 );
560 continue;
561 };
562 let first_bot_entity = first_bot.entity;
563 let first_bot_state = first_bot_state.clone();
564
565 tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
566
567 // this makes it not have to keep locking the ecs
568 let mut states = HashMap::new();
569 states.insert(first_bot_entity, first_bot_state);
570 while let Ok((Some(event), bot)) = bots_rx.try_recv() {
571 let state = match states.entry(bot.entity) {
572 hash_map::Entry::Occupied(e) => e.into_mut(),
573 hash_map::Entry::Vacant(e) => {
574 let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
575 error!(
576 "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
577 bot.username(),
578 bot.entity
579 );
580 continue;
581 };
582 let state = state.clone();
583 e.insert(state)
584 }
585 };
586 tokio::spawn((handler)(bot, event, state.clone()));
587 }
588 }
589 }
590 });
591
592 let appexit = appexit_rx
593 .await
594 .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
595
596 swarm_handler_task.abort();
597 client_handler_task.abort();
598
599 Ok(appexit)
600 }
601}
602
603impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
604 fn default() -> Self {
605 Self::new()
606 }
607}
608
609/// An event about something that doesn't have to do with a single bot.
610#[derive(Clone, Debug)]
611#[non_exhaustive]
612pub enum SwarmEvent {
613 /// All the bots in the swarm have successfully joined the server.
614 Login,
615 /// The swarm was created. This is only fired once, and it's guaranteed to
616 /// be the first event to fire.
617 Init,
618 /// A bot got disconnected from the server.
619 ///
620 /// If you'd like to implement special auto-reconnect behavior beyond what's
621 /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
622 /// and then call [`Swarm::add_with_opts`] with the account and options
623 /// from this event.
624 ///
625 /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
626 Disconnect(Box<Account>, JoinOpts),
627 /// At least one bot received a chat message.
628 Chat(ChatPacket),
629}
630
631pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
632pub type BoxSwarmHandleFn<SS, R> =
633 Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
634
635/// Make a bot [`Swarm`].
636///
637/// [`Swarm`]: struct.Swarm.html
638///
639/// # Examples
640/// ```rust,no_run
641/// use azalea::{prelude::*, swarm::prelude::*};
642/// use std::time::Duration;
643///
644/// #[derive(Default, Clone, Component)]
645/// struct State {}
646///
647/// #[derive(Default, Clone, Resource)]
648/// struct SwarmState {}
649///
650/// #[tokio::main(flavor = "current_thread")]
651/// async fn main() {
652/// let mut accounts = Vec::new();
653/// let mut states = Vec::new();
654///
655/// for i in 0..10 {
656/// accounts.push(Account::offline(&format!("bot{i}")));
657/// states.push(State::default());
658/// }
659///
660/// SwarmBuilder::new()
661/// .add_accounts(accounts.clone())
662/// .set_handler(handle)
663/// .set_swarm_handler(swarm_handle)
664/// .join_delay(Duration::from_millis(1000))
665/// .start("localhost")
666/// .await
667/// .unwrap();
668/// }
669///
670/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
671/// match &event {
672/// _ => {}
673/// }
674/// Ok(())
675/// }
676///
677/// async fn swarm_handle(
678/// mut swarm: Swarm,
679/// event: SwarmEvent,
680/// _state: SwarmState,
681/// ) -> anyhow::Result<()> {
682/// match &event {
683/// SwarmEvent::Chat(m) => {
684/// println!("{}", m.message().to_ansi());
685/// }
686/// _ => {}
687/// }
688/// Ok(())
689/// }
690impl Swarm {
691 /// Add a new account to the swarm. You can remove it later by calling
692 /// [`Client::disconnect`].
693 ///
694 /// # Errors
695 ///
696 /// Returns an error if the server's address could not be resolved.
697 pub async fn add<S: Component + Clone>(
698 &self,
699 account: &Account,
700 state: S,
701 ) -> Result<Client, JoinError> {
702 self.add_with_opts(account, state, &JoinOpts::default())
703 .await
704 }
705 /// Add a new account to the swarm, using custom options. This is useful if
706 /// you want bots in the same swarm to connect to different addresses.
707 /// Usually you'll just want [`Self::add`] though.
708 ///
709 /// # Errors
710 ///
711 /// Returns an error if the server's address could not be resolved.
712 pub async fn add_with_opts<S: Component + Clone>(
713 &self,
714 account: &Account,
715 state: S,
716 join_opts: &JoinOpts,
717 ) -> Result<Client, JoinError> {
718 debug!(
719 "add_with_opts called for account {} with opts {join_opts:?}",
720 account.username
721 );
722
723 let address = join_opts
724 .custom_address
725 .clone()
726 .unwrap_or_else(|| self.address.read().clone());
727 let resolved_address = join_opts
728 .custom_resolved_address
729 .unwrap_or_else(|| *self.resolved_address.read());
730 let proxy = join_opts.proxy.clone();
731
732 let (tx, rx) = mpsc::unbounded_channel();
733
734 let client = Client::start_client(StartClientOpts {
735 ecs_lock: self.ecs_lock.clone(),
736 account: account.clone(),
737 connect_opts: ConnectOpts {
738 address,
739 resolved_address,
740 proxy,
741 },
742 event_sender: Some(tx),
743 })
744 .await;
745 // add the state to the client
746 {
747 let mut ecs = self.ecs_lock.lock();
748 ecs.entity_mut(client.entity).insert(state);
749 }
750
751 let cloned_bot = client.clone();
752 let swarm_tx = self.swarm_tx.clone();
753 let bots_tx = self.bots_tx.clone();
754
755 let join_opts = join_opts.clone();
756 tokio::spawn(Self::event_copying_task(
757 rx, swarm_tx, bots_tx, cloned_bot, join_opts,
758 ));
759
760 Ok(client)
761 }
762
763 /// Copy the events from a client's receiver into bots_tx, until the bot is
764 /// removed from the ECS.
765 async fn event_copying_task(
766 mut rx: mpsc::UnboundedReceiver<Event>,
767 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
768 bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
769 bot: Client,
770 join_opts: JoinOpts,
771 ) {
772 while let Some(event) = rx.recv().await {
773 if rx.len() > 1_000 {
774 static WARNED_1_000: AtomicBool = AtomicBool::new(false);
775 if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
776 warn!(
777 "the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?"
778 )
779 }
780
781 if rx.len() > 10_000 {
782 static WARNED_10_000: AtomicBool = AtomicBool::new(false);
783 if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
784 warn!("the client's Event channel has more than 10,000 items!!")
785 }
786
787 if rx.len() > 100_000 {
788 static WARNED_100_000: AtomicBool = AtomicBool::new(false);
789 if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
790 warn!("the client's Event channel has more than 100,000 items!!!")
791 }
792
793 if rx.len() > 1_000_000 {
794 static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
795 if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
796 warn!(
797 "the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
798 )
799 }
800 }
801 }
802 }
803 }
804
805 if let Event::Disconnect(_) = event {
806 debug!(
807 "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
808 bot.entity
809 );
810 let account = bot
811 .get_component::<Account>()
812 .expect("bot is missing required Account component");
813 swarm_tx
814 .send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
815 .unwrap();
816 }
817
818 // we can't handle events here (since we can't copy the handler),
819 // they're handled above in SwarmBuilder::start
820 if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
821 error!(
822 "Error sending event to swarm, aborting event_copying_task for {}: {e}",
823 bot.entity
824 );
825 break;
826 }
827 }
828 debug!(
829 "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
830 bot.entity
831 );
832 }
833
834 /// Add a new account to the swarm, retrying if it couldn't join. This will
835 /// run forever until the bot joins or the task is aborted.
836 ///
837 /// This does exponential backoff (though very limited), starting at 5
838 /// seconds and doubling up to 15 seconds.
839 #[deprecated(note = "azalea has auto-reconnect functionality built-in now, use `add` instead")]
840 pub async fn add_and_retry_forever<S: Component + Clone>(
841 &self,
842 account: &Account,
843 state: S,
844 ) -> Client {
845 #[allow(deprecated)]
846 self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
847 .await
848 }
849
850 /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
851 /// options.
852 #[deprecated(
853 note = "azalea has auto-reconnect functionality built-in now, use `add_with_opts` instead"
854 )]
855 pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
856 &self,
857 account: &Account,
858 state: S,
859 opts: &JoinOpts,
860 ) -> Client {
861 let mut disconnects: u32 = 0;
862 loop {
863 match self.add_with_opts(account, state.clone(), opts).await {
864 Ok(bot) => return bot,
865 Err(e) => {
866 disconnects += 1;
867 let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
868 .min(Duration::from_secs(15));
869 let username = account.username.clone();
870
871 error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again.");
872
873 sleep(delay).await;
874 }
875 }
876 }
877 }
878
879 /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
880 /// This will include clients that were disconnected without being removed
881 /// from the ECS.
882 ///
883 /// [`LocalEntity`]: azalea_entity::LocalEntity
884 pub fn client_entities(&self) -> Box<[Entity]> {
885 let mut ecs = self.ecs_lock.lock();
886 let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
887 query.iter(&ecs).collect::<Box<[Entity]>>()
888 }
889}
890
891impl IntoIterator for Swarm {
892 type Item = Client;
893 type IntoIter = std::vec::IntoIter<Self::Item>;
894
895 /// Iterate over the bots in this swarm.
896 ///
897 /// ```rust,no_run
898 /// # use azalea::{prelude::*, swarm::prelude::*};
899 /// #[derive(Component, Clone)]
900 /// # pub struct State;
901 /// # fn example(swarm: Swarm) {
902 /// for bot in swarm {
903 /// let state = bot.component::<State>();
904 /// // ...
905 /// }
906 /// # }
907 /// ```
908 fn into_iter(self) -> Self::IntoIter {
909 let client_entities = self.client_entities();
910
911 client_entities
912 .into_iter()
913 .map(|entity| Client::new(entity, self.ecs_lock.clone()))
914 .collect::<Box<[Client]>>()
915 .into_iter()
916 }
917}
918
919/// This plugin group will add all the default plugins necessary for swarms to
920/// work.
921pub struct DefaultSwarmPlugins;
922
923impl PluginGroup for DefaultSwarmPlugins {
924 fn build(self) -> PluginGroupBuilder {
925 PluginGroupBuilder::start::<Self>()
926 .add(chat::SwarmChatPlugin)
927 .add(events::SwarmPlugin)
928 }
929}
930
931/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`]. You
932/// probably don't need to use this manually since the compiler will infer it
933/// for you.
934#[derive(Resource, Clone, Default)]
935pub struct NoSwarmState;