azalea/swarm/mod.rs
1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10 collections::{HashMap, hash_map},
11 future::Future,
12 net::SocketAddr,
13 sync::{
14 Arc,
15 atomic::{self, AtomicBool},
16 },
17 time::Duration,
18};
19
20use azalea_client::{
21 Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket,
22 start_ecs_runner,
23};
24use azalea_protocol::{ServerAddress, resolver};
25use azalea_world::InstanceContainer;
26use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins};
27use bevy_ecs::{component::Component, entity::Entity, system::Resource, world::World};
28use futures::future::{BoxFuture, join_all};
29use parking_lot::{Mutex, RwLock};
30use tokio::sync::mpsc;
31use tracing::{debug, error, warn};
32
33use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
34
35/// A swarm is a way to conveniently control many bots at once, while also
36/// being able to control bots at an individual level when desired.
37///
38/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
39///
40/// Swarms are created from [`SwarmBuilder`].
41///
42/// Clients can be added to the swarm later via [`Swarm::add`], and can be
43/// removed with [`Client::disconnect`].
44#[derive(Clone, Resource)]
45pub struct Swarm {
46 pub ecs_lock: Arc<Mutex<World>>,
47
48 bots: Arc<Mutex<HashMap<Entity, Client>>>,
49
50 // the address is public and mutable so plugins can change it
51 pub resolved_address: Arc<RwLock<SocketAddr>>,
52 pub address: Arc<RwLock<ServerAddress>>,
53
54 pub instance_container: Arc<RwLock<InstanceContainer>>,
55
56 bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
57 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
58}
59
60/// Create a new [`Swarm`].
61///
62/// The generics of this struct stand for the following:
63/// - S: State
64/// - SS: Swarm State
65/// - R: Return type of the handler
66/// - SR: Return type of the swarm handler
67///
68/// You shouldn't have to manually set them though, they'll be inferred for you.
69pub struct SwarmBuilder<S, SS, R, SR>
70where
71 S: Send + Sync + Clone + Component + 'static,
72 SS: Default + Send + Sync + Clone + Resource + 'static,
73{
74 pub(crate) app: App,
75 /// The accounts and proxies that are going to join the server.
76 pub(crate) accounts: Vec<(Account, JoinOpts)>,
77 /// The individual bot states. This must be the same length as `accounts`,
78 /// since each bot gets one state.
79 pub(crate) states: Vec<S>,
80 /// The state for the overall swarm.
81 pub(crate) swarm_state: SS,
82 /// The function that's called every time a bot receives an [`Event`].
83 pub(crate) handler: Option<BoxHandleFn<S, R>>,
84 /// The function that's called every time the swarm receives a
85 /// [`SwarmEvent`].
86 pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
87
88 /// How long we should wait between each bot joining the server. Set to
89 /// None to have every bot connect at the same time. None is different than
90 /// a duration of 0, since if a duration is present the bots will wait for
91 /// the previous one to be ready.
92 pub(crate) join_delay: Option<std::time::Duration>,
93}
94impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
95 /// Start creating the swarm.
96 #[must_use]
97 pub fn new() -> Self {
98 Self::new_without_plugins()
99 .add_plugins(DefaultPlugins)
100 .add_plugins(DefaultBotPlugins)
101 .add_plugins(DefaultSwarmPlugins)
102 }
103
104 /// [`Self::new`] but without adding the plugins by default. This is useful
105 /// if you want to disable a default plugin.
106 ///
107 /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
108 /// [`DefaultSwarmPlugins`] to this.
109 ///
110 /// ```
111 /// # use azalea::{prelude::*, swarm::prelude::*};
112 /// use azalea::{app::PluginGroup, DefaultBotPlugins, DefaultPlugins, swarm::{DefaultSwarmPlugins}};
113 /// use bevy_log::LogPlugin;
114 ///
115 /// let swarm_builder = SwarmBuilder::new_without_plugins()
116 /// .add_plugins(DefaultPlugins.build().disable::<LogPlugin>())
117 /// .add_plugins(DefaultBotPlugins)
118 /// .add_plugins(DefaultSwarmPlugins);
119 /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
120 /// # #[derive(Component, Resource, Clone, Default)]
121 /// # pub struct State;
122 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
123 /// # Ok(())
124 /// # }
125 /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
126 /// # Ok(())
127 /// # }
128 /// ```
129 #[must_use]
130 pub fn new_without_plugins() -> Self {
131 SwarmBuilder {
132 // we create the app here so plugins can add onto it.
133 // the schedules won't run until [`Self::start`] is called.
134 app: App::new(),
135 accounts: Vec::new(),
136 states: Vec::new(),
137 swarm_state: NoSwarmState,
138 handler: None,
139 swarm_handler: None,
140 join_delay: None,
141 }
142 }
143}
144
145impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
146where
147 SS: Default + Send + Sync + Clone + Resource + 'static,
148{
149 /// Set the function that's called every time a bot receives an [`Event`].
150 /// This is the way to handle normal per-bot events.
151 ///
152 /// Currently you can have up to one handler.
153 ///
154 /// ```
155 /// # use azalea::{prelude::*, swarm::prelude::*};
156 /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
157 /// swarm_builder.set_handler(handle);
158 ///
159 /// #[derive(Component, Default, Clone)]
160 /// struct State {}
161 /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
162 /// Ok(())
163 /// }
164 ///
165 /// # #[derive(Resource, Default, Clone)]
166 /// # struct SwarmState {}
167 /// # async fn swarm_handle(
168 /// # mut swarm: Swarm,
169 /// # event: SwarmEvent,
170 /// # state: SwarmState,
171 /// # ) -> anyhow::Result<()> {
172 /// # Ok(())
173 /// # }
174 /// ```
175 #[must_use]
176 pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
177 where
178 Fut: Future<Output = R> + Send + 'static,
179 S: Send + Sync + Clone + Component + Default + 'static,
180 {
181 SwarmBuilder {
182 handler: Some(Box::new(move |bot, event, state: S| {
183 Box::pin(handler(bot, event, state))
184 })),
185 // if we added accounts before the State was set, we've gotta set it to the default now
186 states: vec![S::default(); self.accounts.len()],
187 app: self.app,
188 ..self
189 }
190 }
191}
192
193impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
194where
195 S: Send + Sync + Clone + Component + 'static,
196{
197 /// Set the function that's called every time the swarm receives a
198 /// [`SwarmEvent`]. This is the way to handle global swarm events.
199 ///
200 /// Currently you can have up to one swarm handler.
201 ///
202 /// ```
203 /// # use azalea::{prelude::*, swarm::prelude::*};
204 /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
205 /// swarm_builder.set_swarm_handler(swarm_handle);
206 ///
207 /// # #[derive(Component, Default, Clone)]
208 /// # struct State {}
209 ///
210 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
211 /// # Ok(())
212 /// # }
213 ///
214 /// #[derive(Resource, Default, Clone)]
215 /// struct SwarmState {}
216 /// async fn swarm_handle(
217 /// mut swarm: Swarm,
218 /// event: SwarmEvent,
219 /// state: SwarmState,
220 /// ) -> anyhow::Result<()> {
221 /// Ok(())
222 /// }
223 /// ```
224 #[must_use]
225 pub fn set_swarm_handler<SS, Fut, SR>(
226 self,
227 handler: SwarmHandleFn<SS, Fut>,
228 ) -> SwarmBuilder<S, SS, R, SR>
229 where
230 SS: Default + Send + Sync + Clone + Resource + 'static,
231 Fut: Future<Output = SR> + Send + 'static,
232 {
233 SwarmBuilder {
234 handler: self.handler,
235 app: self.app,
236 accounts: self.accounts,
237 states: self.states,
238 swarm_state: SS::default(),
239 swarm_handler: Some(Box::new(move |swarm, event, state| {
240 Box::pin(handler(swarm, event, state))
241 })),
242 join_delay: self.join_delay,
243 }
244 }
245}
246
247impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
248where
249 S: Send + Sync + Clone + Component + 'static,
250 SS: Default + Send + Sync + Clone + Resource + 'static,
251 R: Send + 'static,
252 SR: Send + 'static,
253{
254 /// Add a vec of [`Account`]s to the swarm.
255 ///
256 /// Use [`Self::add_account`] to only add one account. If you want the
257 /// clients to have different default states, add them one at a time with
258 /// [`Self::add_account_with_state`].
259 ///
260 /// By default, every account will join at the same time, you can add a
261 /// delay with [`Self::join_delay`].
262 #[must_use]
263 pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
264 where
265 S: Default,
266 {
267 for account in accounts {
268 self = self.add_account(account);
269 }
270
271 self
272 }
273
274 /// Add a single new [`Account`] to the swarm. Use [`Self::add_accounts`] to
275 /// add multiple accounts at a time.
276 ///
277 /// This will make the state for this client be the default, use
278 /// [`Self::add_account_with_state`] to avoid that.
279 #[must_use]
280 pub fn add_account(self, account: Account) -> Self
281 where
282 S: Default,
283 {
284 self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
285 }
286
287 /// Add an account with a custom initial state. Use just
288 /// [`Self::add_account`] to use the Default implementation for the state.
289 #[must_use]
290 pub fn add_account_with_state(self, account: Account, state: S) -> Self {
291 self.add_account_with_state_and_opts(account, state, JoinOpts::default())
292 }
293
294 /// Add an account with a custom initial state. Use just
295 /// [`Self::add_account`] to use the Default implementation for the state.
296 #[must_use]
297 pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
298 where
299 S: Default,
300 {
301 self.add_account_with_state_and_opts(account, S::default(), opts)
302 }
303
304 /// Same as [`Self::add_account_with_state`], but allow passing in custom
305 /// join options.
306 #[must_use]
307 pub fn add_account_with_state_and_opts(
308 mut self,
309 account: Account,
310 state: S,
311 join_opts: JoinOpts,
312 ) -> Self {
313 self.accounts.push((account, join_opts));
314 self.states.push(state);
315 self
316 }
317
318 /// Set the swarm state instead of initializing defaults.
319 #[must_use]
320 pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
321 self.swarm_state = swarm_state;
322 self
323 }
324
325 /// Add one or more plugins to this swarm.
326 #[must_use]
327 pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
328 self.app.add_plugins(plugins);
329 self
330 }
331
332 /// Set how long we should wait between each bot joining the server.
333 ///
334 /// By default, every bot will connect at the same time. If you set this
335 /// field, however, the bots will wait for the previous one to have
336 /// connected and *then* they'll wait the given duration.
337 #[must_use]
338 pub fn join_delay(mut self, delay: std::time::Duration) -> Self {
339 self.join_delay = Some(delay);
340 self
341 }
342
343 /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
344 /// server.
345 ///
346 /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
347 /// that implements `TryInto<ServerAddress>`.
348 ///
349 /// [`ServerAddress`]: azalea_protocol::ServerAddress
350 pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<!, StartError> {
351 // convert the TryInto<ServerAddress> into a ServerAddress
352 let address: ServerAddress = match address.try_into() {
353 Ok(address) => address,
354 Err(_) => return Err(StartError::InvalidAddress),
355 };
356
357 self.start_with_default_opts(address, JoinOpts::default())
358 .await
359 }
360
361 /// Do the same as [`Self::start`], but allow passing in default join
362 /// options for the bots.
363 pub async fn start_with_default_opts(
364 self,
365 address: impl TryInto<ServerAddress>,
366 default_join_opts: JoinOpts,
367 ) -> Result<!, StartError> {
368 assert_eq!(
369 self.accounts.len(),
370 self.states.len(),
371 "There must be exactly one state per bot."
372 );
373
374 debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
375
376 // convert the TryInto<ServerAddress> into a ServerAddress
377 let address = match address.try_into() {
378 Ok(address) => address,
379 Err(_) => return Err(StartError::InvalidAddress),
380 };
381
382 let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
383 let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
384 a
385 } else {
386 resolver::resolve_address(&address).await?
387 };
388
389 let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
390
391 // we can't modify the swarm plugins after this
392 let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
393 let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
394
395 swarm_tx.send(SwarmEvent::Init).unwrap();
396
397 let main_schedule_label = self.app.main().update_schedule.unwrap();
398
399 let ecs_lock = start_ecs_runner(self.app);
400
401 let swarm = Swarm {
402 ecs_lock: ecs_lock.clone(),
403 bots: Arc::new(Mutex::new(HashMap::new())),
404
405 resolved_address: Arc::new(RwLock::new(resolved_address)),
406 address: Arc::new(RwLock::new(address)),
407 instance_container,
408
409 bots_tx,
410
411 swarm_tx: swarm_tx.clone(),
412 };
413
414 // run the main schedule so the startup systems run
415 {
416 let mut ecs = ecs_lock.lock();
417 ecs.insert_resource(swarm.clone());
418 ecs.insert_resource(self.swarm_state.clone());
419 ecs.run_schedule(main_schedule_label);
420 ecs.clear_trackers();
421 }
422
423 // SwarmBuilder (self) isn't Send so we have to take all the things we need out
424 // of it
425 let swarm_clone = swarm.clone();
426 let join_delay = self.join_delay;
427 let accounts = self.accounts.clone();
428 let states = self.states.clone();
429
430 tokio::spawn(async move {
431 if let Some(join_delay) = join_delay {
432 // if there's a join delay, then join one by one
433 for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
434 let mut join_opts = default_join_opts.clone();
435 join_opts.update(bot_join_opts);
436 swarm_clone
437 .add_and_retry_forever_with_opts(account, state, &join_opts)
438 .await;
439 tokio::time::sleep(join_delay).await;
440 }
441 } else {
442 // otherwise, join all at once
443 let swarm_borrow = &swarm_clone;
444 join_all(accounts.iter().zip(states).map(
445 |((account, bot_join_opts), state)| async {
446 let mut join_opts = default_join_opts.clone();
447 join_opts.update(bot_join_opts);
448 swarm_borrow
449 .clone()
450 .add_and_retry_forever_with_opts(account, state, &join_opts)
451 .await;
452 },
453 ))
454 .await;
455 }
456
457 swarm_tx.send(SwarmEvent::Login).unwrap();
458 });
459
460 let swarm_state = self.swarm_state;
461
462 // Watch swarm_rx and send those events to the swarm_handle.
463 let swarm_clone = swarm.clone();
464 tokio::spawn(async move {
465 while let Some(event) = swarm_rx.recv().await {
466 if let Some(swarm_handler) = &self.swarm_handler {
467 tokio::spawn((swarm_handler)(
468 swarm_clone.clone(),
469 event,
470 swarm_state.clone(),
471 ));
472 }
473 }
474 });
475
476 // bot events
477 while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
478 if bots_rx.len() > 1_000 {
479 static WARNED: AtomicBool = AtomicBool::new(false);
480 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
481 warn!("the Client Event channel has more than 1000 items!")
482 }
483 }
484
485 if let Some(handler) = &self.handler {
486 let ecs_mutex = first_bot.ecs.clone();
487 let mut ecs = ecs_mutex.lock();
488 let mut query = ecs.query::<Option<&S>>();
489 let Ok(Some(first_bot_state)) = query.get(&mut ecs, first_bot.entity) else {
490 error!(
491 "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
492 first_bot.username(),
493 first_bot.entity
494 );
495 continue;
496 };
497 let first_bot_entity = first_bot.entity;
498 let first_bot_state = first_bot_state.clone();
499
500 tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
501
502 // this makes it not have to keep locking the ecs
503 let mut states = HashMap::new();
504 states.insert(first_bot_entity, first_bot_state);
505 while let Ok((Some(event), bot)) = bots_rx.try_recv() {
506 let state = match states.entry(bot.entity) {
507 hash_map::Entry::Occupied(e) => e.into_mut(),
508 hash_map::Entry::Vacant(e) => {
509 let Ok(Some(state)) = query.get(&mut ecs, bot.entity) else {
510 error!(
511 "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
512 bot.username(),
513 bot.entity
514 );
515 continue;
516 };
517 let state = state.clone();
518 e.insert(state)
519 }
520 };
521 tokio::spawn((handler)(bot, event, state.clone()));
522 }
523 }
524 }
525
526 unreachable!(
527 "bots_rx.recv() should never be None because the bots_tx channel is never closed"
528 );
529 }
530}
531
532impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
533 fn default() -> Self {
534 Self::new()
535 }
536}
537
538/// An event about something that doesn't have to do with a single bot.
539#[derive(Clone, Debug)]
540#[non_exhaustive]
541pub enum SwarmEvent {
542 /// All the bots in the swarm have successfully joined the server.
543 Login,
544 /// The swarm was created. This is only fired once, and it's guaranteed to
545 /// be the first event to fire.
546 Init,
547 /// A bot got disconnected from the server.
548 ///
549 /// You can implement an auto-reconnect by calling [`Swarm::add_with_opts`]
550 /// with the account and options from this event.
551 Disconnect(Box<Account>, JoinOpts),
552 /// At least one bot received a chat message.
553 Chat(ChatPacket),
554}
555
556pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
557pub type BoxSwarmHandleFn<SS, R> =
558 Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send>;
559
560/// Make a bot [`Swarm`].
561///
562/// [`Swarm`]: struct.Swarm.html
563///
564/// # Examples
565/// ```rust,no_run
566/// use azalea::{prelude::*, swarm::prelude::*};
567/// use std::time::Duration;
568///
569/// #[derive(Default, Clone, Component)]
570/// struct State {}
571///
572/// #[derive(Default, Clone, Resource)]
573/// struct SwarmState {}
574///
575/// #[tokio::main]
576/// async fn main() {
577/// let mut accounts = Vec::new();
578/// let mut states = Vec::new();
579///
580/// for i in 0..10 {
581/// accounts.push(Account::offline(&format!("bot{i}")));
582/// states.push(State::default());
583/// }
584///
585/// SwarmBuilder::new()
586/// .add_accounts(accounts.clone())
587/// .set_handler(handle)
588/// .set_swarm_handler(swarm_handle)
589/// .join_delay(Duration::from_millis(1000))
590/// .start("localhost")
591/// .await
592/// .unwrap();
593/// }
594///
595/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
596/// match &event {
597/// _ => {}
598/// }
599/// Ok(())
600/// }
601///
602/// async fn swarm_handle(
603/// mut swarm: Swarm,
604/// event: SwarmEvent,
605/// _state: SwarmState,
606/// ) -> anyhow::Result<()> {
607/// match &event {
608/// SwarmEvent::Disconnect(account, join_opts) => {
609/// // automatically reconnect after 5 seconds
610/// tokio::time::sleep(Duration::from_secs(5)).await;
611/// swarm.add_with_opts(account, State::default(), join_opts).await?;
612/// }
613/// SwarmEvent::Chat(m) => {
614/// println!("{}", m.message().to_ansi());
615/// }
616/// _ => {}
617/// }
618/// Ok(())
619/// }
620impl Swarm {
621 /// Add a new account to the swarm. You can remove it later by calling
622 /// [`Client::disconnect`].
623 ///
624 /// # Errors
625 ///
626 /// Returns an `Err` if the bot could not do a handshake successfully.
627 pub async fn add<S: Component + Clone>(
628 &self,
629 account: &Account,
630 state: S,
631 ) -> Result<Client, JoinError> {
632 self.add_with_opts(account, state, &JoinOpts::default())
633 .await
634 }
635 /// Add a new account to the swarm, using custom options. This is useful if
636 /// you want bots in the same swarm to connect to different addresses.
637 /// Usually you'll just want [`Self::add`] though.
638 ///
639 /// # Errors
640 ///
641 /// Returns an `Err` if the bot could not do a handshake successfully.
642 pub async fn add_with_opts<S: Component + Clone>(
643 &self,
644 account: &Account,
645 state: S,
646 join_opts: &JoinOpts,
647 ) -> Result<Client, JoinError> {
648 debug!("add_with_opts called for account {}", account.username);
649
650 let address = join_opts
651 .custom_address
652 .clone()
653 .unwrap_or_else(|| self.address.read().clone());
654 let resolved_address = join_opts
655 .custom_resolved_address
656 .unwrap_or_else(|| *self.resolved_address.read());
657
658 let (tx, rx) = mpsc::unbounded_channel();
659
660 let bot = Client::start_client(StartClientOpts {
661 ecs_lock: self.ecs_lock.clone(),
662 account,
663 address: &address,
664 resolved_address: &resolved_address,
665 proxy: join_opts.proxy.clone(),
666 event_sender: Some(tx),
667 })
668 .await?;
669 // add the state to the client
670 {
671 let mut ecs = self.ecs_lock.lock();
672 ecs.entity_mut(bot.entity).insert(state);
673 }
674
675 self.bots.lock().insert(bot.entity, bot.clone());
676
677 let cloned_bots = self.bots.clone();
678 let cloned_bots_tx = self.bots_tx.clone();
679 let cloned_bot = bot.clone();
680 let swarm_tx = self.swarm_tx.clone();
681 let join_opts = join_opts.clone();
682 tokio::spawn(Self::event_copying_task(
683 rx,
684 cloned_bots,
685 cloned_bots_tx,
686 cloned_bot,
687 swarm_tx,
688 join_opts,
689 ));
690
691 Ok(bot)
692 }
693
694 async fn event_copying_task(
695 mut rx: mpsc::UnboundedReceiver<Event>,
696 cloned_bots: Arc<Mutex<HashMap<Entity, Client>>>,
697 cloned_bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
698 cloned_bot: Client,
699 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
700 join_opts: JoinOpts,
701 ) {
702 while let Some(event) = rx.recv().await {
703 if rx.len() > 1_000 {
704 static WARNED_1_000: AtomicBool = AtomicBool::new(false);
705 if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
706 warn!("the client's Event channel has more than 1000 items!")
707 }
708
709 if rx.len() > 10_000 {
710 static WARNED_10_000: AtomicBool = AtomicBool::new(false);
711 if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
712 warn!("the client's Event channel has more than 10,000 items!!")
713 }
714
715 if rx.len() > 100_000 {
716 static WARNED_100_000: AtomicBool = AtomicBool::new(false);
717 if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
718 warn!("the client's Event channel has more than 100,000 items!!!")
719 }
720
721 if rx.len() > 1_000_000 {
722 static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
723 if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
724 warn!(
725 "the client's Event channel has more than 1,000,000 items!!!! i sincerely hope no one ever sees this warning"
726 )
727 }
728 }
729 }
730 }
731 }
732
733 // we can't handle events here (since we can't copy the handler),
734 // they're handled above in SwarmBuilder::start
735 if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
736 error!("Error sending event to swarm: {e}");
737 }
738 }
739 debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect");
740
741 cloned_bots.lock().remove(&cloned_bot.entity);
742 let account = cloned_bot
743 .get_component::<Account>()
744 .expect("bot is missing required Account component");
745 swarm_tx
746 .send(SwarmEvent::Disconnect(Box::new(account), join_opts))
747 .unwrap();
748 }
749
750 /// Add a new account to the swarm, retrying if it couldn't join. This will
751 /// run forever until the bot joins or the task is aborted.
752 ///
753 /// This does exponential backoff (though very limited), starting at 5
754 /// seconds and doubling up to 15 seconds.
755 pub async fn add_and_retry_forever<S: Component + Clone>(
756 &self,
757 account: &Account,
758 state: S,
759 ) -> Client {
760 self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
761 .await
762 }
763
764 /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
765 /// options.
766 pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
767 &self,
768 account: &Account,
769 state: S,
770 opts: &JoinOpts,
771 ) -> Client {
772 let mut disconnects = 0;
773 loop {
774 match self.add_with_opts(account, state.clone(), opts).await {
775 Ok(bot) => return bot,
776 Err(e) => {
777 disconnects += 1;
778 let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
779 .min(Duration::from_secs(15));
780 let username = account.username.clone();
781
782 match &e {
783 JoinError::Disconnect { reason } => {
784 error!(
785 "Error joining as {username}, server says: \"{reason}\". Waiting {delay:?} and trying again."
786 );
787 }
788 _ => {
789 error!(
790 "Error joining as {username}: {e}. Waiting {delay:?} and trying again."
791 );
792 }
793 }
794
795 tokio::time::sleep(delay).await;
796 }
797 }
798 }
799 }
800}
801
802impl IntoIterator for Swarm {
803 type Item = Client;
804 type IntoIter = std::vec::IntoIter<Self::Item>;
805
806 /// Iterate over the bots in this swarm.
807 ///
808 /// ```rust,no_run
809 /// # use azalea::{prelude::*, swarm::prelude::*};
810 /// #[derive(Component, Clone)]
811 /// # pub struct State;
812 /// # fn example(swarm: Swarm) {
813 /// for bot in swarm {
814 /// let state = bot.component::<State>();
815 /// // ...
816 /// }
817 /// # }
818 /// ```
819 fn into_iter(self) -> Self::IntoIter {
820 self.bots
821 .lock()
822 .clone()
823 .into_values()
824 .collect::<Vec<_>>()
825 .into_iter()
826 }
827}
828
829/// This plugin group will add all the default plugins necessary for swarms to
830/// work.
831pub struct DefaultSwarmPlugins;
832
833impl PluginGroup for DefaultSwarmPlugins {
834 fn build(self) -> PluginGroupBuilder {
835 PluginGroupBuilder::start::<Self>()
836 .add(chat::SwarmChatPlugin)
837 .add(events::SwarmPlugin)
838 }
839}
840
841/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`]. You
842/// probably don't need to use this manually since the compiler will infer it
843/// for you.
844#[derive(Resource, Clone, Default)]
845pub struct NoSwarmState;