azalea/swarm/mod.rs
1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod chat;
6mod events;
7pub mod prelude;
8
9use std::{
10 collections::{HashMap, hash_map},
11 future::Future,
12 mem,
13 net::SocketAddr,
14 sync::{
15 Arc,
16 atomic::{self, AtomicBool},
17 },
18 time::Duration,
19};
20
21use azalea_client::{
22 Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts,
23 auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
24 chat::ChatPacket,
25 join::ConnectOpts,
26 start_ecs_runner,
27};
28use azalea_entity::LocalEntity;
29use azalea_protocol::{ServerAddress, resolver};
30use azalea_world::InstanceContainer;
31use bevy_app::{App, AppExit, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
32use bevy_ecs::prelude::*;
33use futures::future::{BoxFuture, join_all};
34use parking_lot::{Mutex, RwLock};
35use tokio::{sync::mpsc, time::sleep};
36use tracing::{debug, error, warn};
37
38use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError};
39
40/// A swarm is a way to conveniently control many bots at once, while also
41/// being able to control bots at an individual level when desired.
42///
43/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
44///
45/// Swarms are created from [`SwarmBuilder`].
46///
47/// Clients can be added to the swarm later via [`Swarm::add`], and can be
48/// removed with [`Client::disconnect`].
49#[derive(Clone, Resource)]
50pub struct Swarm {
51 pub ecs_lock: Arc<Mutex<World>>,
52
53 // the address is public and mutable so plugins can change it
54 pub resolved_address: Arc<RwLock<SocketAddr>>,
55 pub address: Arc<RwLock<ServerAddress>>,
56
57 pub instance_container: Arc<RwLock<InstanceContainer>>,
58
59 /// This is used internally to make the client handler function work.
60 bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
61 /// This is used internally to make the swarm handler function work.
62 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
63}
64
65/// Create a new [`Swarm`].
66///
67/// The generics of this struct stand for the following:
68/// - S: State
69/// - SS: Swarm State
70/// - R: Return type of the handler
71/// - SR: Return type of the swarm handler
72///
73/// You shouldn't have to manually set them though, they'll be inferred for you.
74pub struct SwarmBuilder<S, SS, R, SR>
75where
76 S: Send + Sync + Clone + Component + 'static,
77 SS: Default + Send + Sync + Clone + Resource + 'static,
78 Self: Send,
79{
80 // SubApp is used instead of App to make it Send
81 pub(crate) app: SubApp,
82 /// The accounts and proxies that are going to join the server.
83 pub(crate) accounts: Vec<(Account, JoinOpts)>,
84 /// The individual bot states.
85 ///
86 /// This must be the same length as `accounts`, since each bot gets one
87 /// state.
88 pub(crate) states: Vec<S>,
89 /// The state for the overall swarm.
90 pub(crate) swarm_state: SS,
91 /// The function that's called every time a bot receives an [`Event`].
92 pub(crate) handler: Option<BoxHandleFn<S, R>>,
93 /// The function that's called every time the swarm receives a
94 /// [`SwarmEvent`].
95 pub(crate) swarm_handler: Option<BoxSwarmHandleFn<SS, SR>>,
96
97 /// How long we should wait between each bot joining the server.
98 ///
99 /// If this is None, every bot will connect at the same time. None is
100 /// different than a duration of 0, since if a duration is present the
101 /// bots will wait for the previous one to be ready.
102 pub(crate) join_delay: Option<Duration>,
103
104 /// The default reconnection delay for our bots.
105 ///
106 /// This will change the value of the [`AutoReconnectDelay`] resource.
107 pub(crate) reconnect_after: Option<Duration>,
108}
109impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
110 /// Start creating the swarm.
111 #[must_use]
112 pub fn new() -> Self {
113 Self::new_without_plugins()
114 .add_plugins(DefaultPlugins)
115 .add_plugins(DefaultBotPlugins)
116 .add_plugins(DefaultSwarmPlugins)
117 }
118
119 /// [`Self::new`] but without adding the plugins by default.
120 ///
121 /// This is useful if you want to disable a default plugin. This also exists
122 /// for `ClientBuilder`, see [`ClientBuilder::new_without_plugins`].
123 ///
124 /// You **must** add [`DefaultPlugins`], [`DefaultBotPlugins`], and
125 /// [`DefaultSwarmPlugins`] to this.
126 ///
127 /// ```
128 /// # use azalea::{prelude::*, swarm::prelude::*};
129 /// use azalea::app::PluginGroup;
130 ///
131 /// let swarm_builder = SwarmBuilder::new_without_plugins()
132 /// .add_plugins(azalea::DefaultPlugins.build().disable::<azalea::chat_signing::ChatSigningPlugin>())
133 /// .add_plugins(azalea::bot::DefaultBotPlugins)
134 /// .add_plugins(azalea::swarm::DefaultSwarmPlugins);
135 /// # swarm_builder.set_handler(handle).set_swarm_handler(swarm_handle);
136 /// # #[derive(Component, Resource, Clone, Default)]
137 /// # pub struct State;
138 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
139 /// # Ok(())
140 /// # }
141 /// # async fn swarm_handle(swarm: Swarm, event: SwarmEvent, state: State) -> anyhow::Result<()> {
142 /// # Ok(())
143 /// # }
144 /// ```
145 ///
146 /// [`ClientBuilder::new_without_plugins`]: crate::ClientBuilder::new_without_plugins
147 #[must_use]
148 pub fn new_without_plugins() -> Self {
149 SwarmBuilder {
150 // we create the app here so plugins can add onto it.
151 // the schedules won't run until [`Self::start`] is called.
152
153 // `App::new()` is used instead of `SubApp::new()` so the necessary resources are
154 // initialized
155 app: mem::take(App::new().main_mut()),
156 accounts: Vec::new(),
157 states: Vec::new(),
158 swarm_state: NoSwarmState,
159 handler: None,
160 swarm_handler: None,
161 join_delay: None,
162 reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
163 }
164 }
165}
166
167impl<SS, SR> SwarmBuilder<NoState, SS, (), SR>
168where
169 SS: Default + Send + Sync + Clone + Resource + 'static,
170{
171 /// Set the function that's called every time a bot receives an
172 /// [`enum@Event`]. This is the intended way to handle normal per-bot
173 /// events.
174 ///
175 /// Currently you can have up to one handler.
176 ///
177 /// Note that if you're creating clients directly from the ECS using
178 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
179 /// the handler function won't be called for that client. This also applies
180 /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
181 /// most bots, though.
182 ///
183 /// ```
184 /// # use azalea::{prelude::*, swarm::prelude::*};
185 /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
186 /// swarm_builder.set_handler(handle);
187 ///
188 /// #[derive(Component, Default, Clone)]
189 /// struct State {}
190 /// async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
191 /// Ok(())
192 /// }
193 ///
194 /// # #[derive(Resource, Default, Clone)]
195 /// # struct SwarmState {}
196 /// # async fn swarm_handle(
197 /// # mut swarm: Swarm,
198 /// # event: SwarmEvent,
199 /// # state: SwarmState,
200 /// # ) -> anyhow::Result<()> {
201 /// # Ok(())
202 /// # }
203 /// ```
204 ///
205 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
206 #[must_use]
207 pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
208 where
209 Fut: Future<Output = R> + Send + 'static,
210 S: Send + Sync + Clone + Component + Default + 'static,
211 {
212 SwarmBuilder {
213 handler: Some(Box::new(move |bot, event, state: S| {
214 Box::pin(handler(bot, event, state))
215 })),
216 // if we added accounts before the State was set, we've gotta set it to the default now
217 states: vec![S::default(); self.accounts.len()],
218 app: self.app,
219 ..self
220 }
221 }
222}
223
224impl<S, R> SwarmBuilder<S, NoSwarmState, R, ()>
225where
226 S: Send + Sync + Clone + Component + 'static,
227{
228 /// Set the function that's called every time the swarm receives a
229 /// [`SwarmEvent`]. This is the intended way to handle global swarm events.
230 ///
231 /// Currently you can have up to one swarm handler.
232 ///
233 /// Note that if you're creating clients directly from the ECS using
234 /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
235 /// this handler function won't be called for that client. This also applies
236 /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
237 /// most bots, though.
238 ///
239 /// ```
240 /// # use azalea::{prelude::*, swarm::prelude::*};
241 /// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
242 /// swarm_builder.set_swarm_handler(swarm_handle);
243 ///
244 /// # #[derive(Component, Default, Clone)]
245 /// # struct State {}
246 ///
247 /// # async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
248 /// # Ok(())
249 /// # }
250 ///
251 /// #[derive(Resource, Default, Clone)]
252 /// struct SwarmState {}
253 /// async fn swarm_handle(
254 /// mut swarm: Swarm,
255 /// event: SwarmEvent,
256 /// state: SwarmState,
257 /// ) -> anyhow::Result<()> {
258 /// Ok(())
259 /// }
260 /// ```
261 ///
262 /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
263 #[must_use]
264 pub fn set_swarm_handler<SS, Fut, SR>(
265 self,
266 handler: SwarmHandleFn<SS, Fut>,
267 ) -> SwarmBuilder<S, SS, R, SR>
268 where
269 SS: Default + Send + Sync + Clone + Resource + 'static,
270 Fut: Future<Output = SR> + Send + 'static,
271 {
272 SwarmBuilder {
273 handler: self.handler,
274 app: self.app,
275 accounts: self.accounts,
276 states: self.states,
277 swarm_state: SS::default(),
278 swarm_handler: Some(Box::new(move |swarm, event, state| {
279 Box::pin(handler(swarm, event, state))
280 })),
281 join_delay: self.join_delay,
282 reconnect_after: self.reconnect_after,
283 }
284 }
285}
286
287impl<S, SS, R, SR> SwarmBuilder<S, SS, R, SR>
288where
289 S: Send + Sync + Clone + Component + 'static,
290 SS: Default + Send + Sync + Clone + Resource + 'static,
291 R: Send + 'static,
292 SR: Send + 'static,
293{
294 /// Add a vec of [`Account`]s to the swarm.
295 ///
296 /// Use [`Self::add_account`] to only add one account. If you want the
297 /// clients to have different default states, add them one at a time with
298 /// [`Self::add_account_with_state`].
299 ///
300 /// By default, every account will join at the same time, you can add a
301 /// delay with [`Self::join_delay`].
302 #[must_use]
303 pub fn add_accounts(mut self, accounts: Vec<Account>) -> Self
304 where
305 S: Default,
306 {
307 for account in accounts {
308 self = self.add_account(account);
309 }
310
311 self
312 }
313
314 /// Add a single new [`Account`] to the swarm.
315 ///
316 /// Use [`Self::add_accounts`] to add multiple accounts at a time.
317 ///
318 /// This will make the state for this client be the default, use
319 /// [`Self::add_account_with_state`] to avoid that.
320 #[must_use]
321 pub fn add_account(self, account: Account) -> Self
322 where
323 S: Default,
324 {
325 self.add_account_with_state_and_opts(account, S::default(), JoinOpts::default())
326 }
327
328 /// Add an account with a custom initial state.
329 ///
330 /// Use just [`Self::add_account`] to use the `Default` implementation for
331 /// the state.
332 #[must_use]
333 pub fn add_account_with_state(self, account: Account, state: S) -> Self {
334 self.add_account_with_state_and_opts(account, state, JoinOpts::default())
335 }
336
337 /// Add an account with a custom initial state.
338 ///
339 /// Use just [`Self::add_account`] to use the `Default` implementation for
340 /// the state.
341 #[must_use]
342 pub fn add_account_with_opts(self, account: Account, opts: JoinOpts) -> Self
343 where
344 S: Default,
345 {
346 self.add_account_with_state_and_opts(account, S::default(), opts)
347 }
348
349 /// Same as [`Self::add_account_with_state`], but allow passing in custom
350 /// join options.
351 #[must_use]
352 pub fn add_account_with_state_and_opts(
353 mut self,
354 account: Account,
355 state: S,
356 join_opts: JoinOpts,
357 ) -> Self {
358 self.accounts.push((account, join_opts));
359 self.states.push(state);
360 self
361 }
362
363 /// Set the swarm state instead of initializing defaults.
364 #[must_use]
365 pub fn set_swarm_state(mut self, swarm_state: SS) -> Self {
366 self.swarm_state = swarm_state;
367 self
368 }
369
370 /// Add one or more plugins to this swarm.
371 ///
372 /// See [`Self::new_without_plugins`] to learn how to disable default
373 /// plugins.
374 #[must_use]
375 pub fn add_plugins<M>(mut self, plugins: impl Plugins<M>) -> Self {
376 self.app.add_plugins(plugins);
377 self
378 }
379
380 /// Set how long we should wait between each bot joining the server.
381 ///
382 /// By default, every bot will connect at the same time. If you set this
383 /// field, however, the bots will wait for the previous one to have
384 /// connected and *then* they'll wait the given duration.
385 #[must_use]
386 pub fn join_delay(mut self, delay: Duration) -> Self {
387 self.join_delay = Some(delay);
388 self
389 }
390
391 /// Configures the auto-reconnection behavior for our bots.
392 ///
393 /// If this is `Some`, then it'll set the default reconnection delay for our
394 /// bots (how long they'll wait after being kicked before they try
395 /// rejoining). if it's `None`, then auto-reconnecting will be disabled.
396 ///
397 /// If this function isn't called, then our clients will reconnect after
398 /// [`DEFAULT_RECONNECT_DELAY`].
399 #[must_use]
400 pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
401 self.reconnect_after = delay.into();
402 self
403 }
404
405 /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
406 /// server.
407 ///
408 /// The `address` argument can be a `&str`, [`ServerAddress`], or anything
409 /// that implements `TryInto<ServerAddress>`.
410 ///
411 /// [`ServerAddress`]: azalea_protocol::ServerAddress
412 pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<AppExit, StartError> {
413 // convert the TryInto<ServerAddress> into a ServerAddress
414 let address: ServerAddress = match address.try_into() {
415 Ok(address) => address,
416 Err(_) => return Err(StartError::InvalidAddress),
417 };
418
419 self.start_with_default_opts(address, JoinOpts::default())
420 .await
421 }
422
423 /// Do the same as [`Self::start`], but allow passing in default join
424 /// options for the bots.
425 pub async fn start_with_default_opts(
426 mut self,
427 address: impl TryInto<ServerAddress>,
428 default_join_opts: JoinOpts,
429 ) -> Result<AppExit, StartError> {
430 assert_eq!(
431 self.accounts.len(),
432 self.states.len(),
433 "There must be exactly one state per bot."
434 );
435
436 debug!("Starting Azalea {}", env!("CARGO_PKG_VERSION"));
437
438 // convert the TryInto<ServerAddress> into a ServerAddress
439 let address = match address.try_into() {
440 Ok(address) => address,
441 Err(_) => return Err(StartError::InvalidAddress),
442 };
443
444 let address: ServerAddress = default_join_opts.custom_address.clone().unwrap_or(address);
445 let resolved_address = if let Some(a) = default_join_opts.custom_resolved_address {
446 a
447 } else {
448 resolver::resolve_address(&address).await?
449 };
450
451 let instance_container = Arc::new(RwLock::new(InstanceContainer::default()));
452
453 // we can't modify the swarm plugins after this
454 let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
455 let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
456
457 swarm_tx.send(SwarmEvent::Init).unwrap();
458
459 let main_schedule_label = self.app.update_schedule.unwrap();
460
461 let (ecs_lock, start_running_systems, appexit_rx) = start_ecs_runner(&mut self.app);
462
463 let swarm = Swarm {
464 ecs_lock: ecs_lock.clone(),
465
466 resolved_address: Arc::new(RwLock::new(resolved_address)),
467 address: Arc::new(RwLock::new(address)),
468 instance_container,
469
470 bots_tx,
471
472 swarm_tx: swarm_tx.clone(),
473 };
474
475 // run the main schedule so the startup systems run
476 {
477 let mut ecs = ecs_lock.lock();
478 ecs.insert_resource(swarm.clone());
479 ecs.insert_resource(self.swarm_state.clone());
480 if let Some(reconnect_after) = self.reconnect_after {
481 ecs.insert_resource(AutoReconnectDelay {
482 delay: reconnect_after,
483 });
484 } else {
485 ecs.remove_resource::<AutoReconnectDelay>();
486 }
487 ecs.run_schedule(main_schedule_label);
488 ecs.clear_trackers();
489 }
490
491 // only do this after we inserted the Swarm and state resources to avoid errors
492 // where Res<Swarm> is inaccessible
493 start_running_systems();
494
495 // SwarmBuilder (self) isn't Send so we have to take all the things we need out
496 // of it
497 let swarm_clone = swarm.clone();
498 let join_delay = self.join_delay;
499 let accounts = self.accounts.clone();
500 let states = self.states.clone();
501
502 tokio::spawn(async move {
503 if let Some(join_delay) = join_delay {
504 // if there's a join delay, then join one by one
505 for ((account, bot_join_opts), state) in accounts.iter().zip(states) {
506 let mut join_opts = default_join_opts.clone();
507 join_opts.update(bot_join_opts);
508 let _ = swarm_clone.add_with_opts(account, state, &join_opts).await;
509 tokio::time::sleep(join_delay).await;
510 }
511 } else {
512 // otherwise, join all at once
513 let swarm_borrow = &swarm_clone;
514 join_all(accounts.iter().zip(states).map(
515 |((account, bot_join_opts), state)| async {
516 let mut join_opts = default_join_opts.clone();
517 join_opts.update(bot_join_opts);
518 let _ = swarm_borrow
519 .clone()
520 .add_with_opts(account, state, &join_opts)
521 .await;
522 },
523 ))
524 .await;
525 }
526
527 swarm_tx.send(SwarmEvent::Login).unwrap();
528 });
529
530 let swarm_state = self.swarm_state;
531
532 // Watch swarm_rx and send those events to the swarm_handle.
533 let swarm_clone = swarm.clone();
534 let swarm_handler_task = tokio::spawn(async move {
535 while let Some(event) = swarm_rx.recv().await {
536 if let Some(swarm_handler) = &self.swarm_handler {
537 tokio::spawn((swarm_handler)(
538 swarm_clone.clone(),
539 event,
540 swarm_state.clone(),
541 ));
542 }
543 }
544
545 unreachable!(
546 "The `Swarm` here contains a sender for the `SwarmEvent`s, so swarm_rx.recv() will never fail"
547 );
548 });
549
550 // bot events
551 let client_handler_task = tokio::spawn(async move {
552 while let Some((Some(first_event), first_bot)) = bots_rx.recv().await {
553 if bots_rx.len() > 1_000 {
554 static WARNED: AtomicBool = AtomicBool::new(false);
555 if !WARNED.swap(true, atomic::Ordering::Relaxed) {
556 warn!("the Client Event channel has more than 1000 items!")
557 }
558 }
559
560 if let Some(handler) = &self.handler {
561 let ecs_mutex = first_bot.ecs.clone();
562 let mut ecs = ecs_mutex.lock();
563 let mut query = ecs.query::<Option<&S>>();
564 let Ok(Some(first_bot_state)) = query.get(&ecs, first_bot.entity) else {
565 error!(
566 "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
567 first_bot.username(),
568 first_bot.entity
569 );
570 continue;
571 };
572 let first_bot_entity = first_bot.entity;
573 let first_bot_state = first_bot_state.clone();
574
575 tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone()));
576
577 // this makes it not have to keep locking the ecs
578 let mut states = HashMap::new();
579 states.insert(first_bot_entity, first_bot_state);
580 while let Ok((Some(event), bot)) = bots_rx.try_recv() {
581 let state = match states.entry(bot.entity) {
582 hash_map::Entry::Occupied(e) => e.into_mut(),
583 hash_map::Entry::Vacant(e) => {
584 let Ok(Some(state)) = query.get(&ecs, bot.entity) else {
585 error!(
586 "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
587 bot.username(),
588 bot.entity
589 );
590 continue;
591 };
592 let state = state.clone();
593 e.insert(state)
594 }
595 };
596 tokio::spawn((handler)(bot, event, state.clone()));
597 }
598 }
599 }
600 });
601
602 let appexit = appexit_rx
603 .await
604 .expect("appexit_tx shouldn't be dropped by the ECS runner before sending");
605
606 swarm_handler_task.abort();
607 client_handler_task.abort();
608
609 Ok(appexit)
610 }
611}
612
613impl Default for SwarmBuilder<NoState, NoSwarmState, (), ()> {
614 fn default() -> Self {
615 Self::new()
616 }
617}
618
619/// An event about something that doesn't have to do with a single bot.
620#[derive(Clone, Debug)]
621#[non_exhaustive]
622pub enum SwarmEvent {
623 /// All the bots in the swarm have successfully joined the server.
624 Login,
625 /// The swarm was created.
626 ///
627 /// This is only fired once, and it's guaranteed to be the first event to
628 /// fire.
629 Init,
630 /// A bot got disconnected from the server.
631 ///
632 /// If you'd like to implement special auto-reconnect behavior beyond what's
633 /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
634 /// and then call [`Swarm::add_with_opts`] with the account and options
635 /// from this event.
636 ///
637 /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
638 Disconnect(Box<Account>, JoinOpts),
639 /// At least one bot received a chat message.
640 Chat(ChatPacket),
641}
642
643pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
644pub type BoxSwarmHandleFn<SS, R> =
645 Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
646
647/// Make a bot [`Swarm`].
648///
649/// [`Swarm`]: struct.Swarm.html
650///
651/// # Examples
652/// ```rust,no_run
653/// use azalea::{prelude::*, swarm::prelude::*};
654/// use std::time::Duration;
655///
656/// #[derive(Default, Clone, Component)]
657/// struct State {}
658///
659/// #[derive(Default, Clone, Resource)]
660/// struct SwarmState {}
661///
662/// #[tokio::main(flavor = "current_thread")]
663/// async fn main() {
664/// let mut accounts = Vec::new();
665/// let mut states = Vec::new();
666///
667/// for i in 0..10 {
668/// accounts.push(Account::offline(&format!("bot{i}")));
669/// states.push(State::default());
670/// }
671///
672/// SwarmBuilder::new()
673/// .add_accounts(accounts.clone())
674/// .set_handler(handle)
675/// .set_swarm_handler(swarm_handle)
676/// .join_delay(Duration::from_millis(1000))
677/// .start("localhost")
678/// .await
679/// .unwrap();
680/// }
681///
682/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
683/// match &event {
684/// _ => {}
685/// }
686/// Ok(())
687/// }
688///
689/// async fn swarm_handle(
690/// mut swarm: Swarm,
691/// event: SwarmEvent,
692/// _state: SwarmState,
693/// ) -> anyhow::Result<()> {
694/// match &event {
695/// SwarmEvent::Chat(m) => {
696/// println!("{}", m.message().to_ansi());
697/// }
698/// _ => {}
699/// }
700/// Ok(())
701/// }
702impl Swarm {
703 /// Add a new account to the swarm.
704 ///
705 /// You can remove it later by calling [`Client::disconnect`].
706 ///
707 /// # Errors
708 ///
709 /// Returns an error if the server's address could not be resolved.
710 pub async fn add<S: Component + Clone>(
711 &self,
712 account: &Account,
713 state: S,
714 ) -> Result<Client, JoinError> {
715 self.add_with_opts(account, state, &JoinOpts::default())
716 .await
717 }
718 /// Add a new account to the swarm, using custom options.
719 ///
720 /// This is useful if you want bots in the same swarm to connect to
721 /// different addresses. Usually you'll just want [`Self::add`] though.
722 ///
723 /// # Errors
724 ///
725 /// Returns an error if the server's address could not be resolved.
726 pub async fn add_with_opts<S: Component + Clone>(
727 &self,
728 account: &Account,
729 state: S,
730 join_opts: &JoinOpts,
731 ) -> Result<Client, JoinError> {
732 debug!(
733 "add_with_opts called for account {} with opts {join_opts:?}",
734 account.username
735 );
736
737 let address = join_opts
738 .custom_address
739 .clone()
740 .unwrap_or_else(|| self.address.read().clone());
741 let resolved_address = join_opts
742 .custom_resolved_address
743 .unwrap_or_else(|| *self.resolved_address.read());
744 let proxy = join_opts.proxy.clone();
745
746 let (tx, rx) = mpsc::unbounded_channel();
747
748 let client = Client::start_client(StartClientOpts {
749 ecs_lock: self.ecs_lock.clone(),
750 account: account.clone(),
751 connect_opts: ConnectOpts {
752 address,
753 resolved_address,
754 proxy,
755 },
756 event_sender: Some(tx),
757 })
758 .await;
759 // add the state to the client
760 {
761 let mut ecs = self.ecs_lock.lock();
762 ecs.entity_mut(client.entity).insert(state);
763 }
764
765 let cloned_bot = client.clone();
766 let swarm_tx = self.swarm_tx.clone();
767 let bots_tx = self.bots_tx.clone();
768
769 let join_opts = join_opts.clone();
770 tokio::spawn(Self::event_copying_task(
771 rx, swarm_tx, bots_tx, cloned_bot, join_opts,
772 ));
773
774 Ok(client)
775 }
776
777 /// Copy the events from a client's receiver into bots_tx, until the bot is
778 /// removed from the ECS.
779 async fn event_copying_task(
780 mut rx: mpsc::UnboundedReceiver<Event>,
781 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
782 bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
783 bot: Client,
784 join_opts: JoinOpts,
785 ) {
786 while let Some(event) = rx.recv().await {
787 if rx.len() > 1_000 {
788 static WARNED_1_000: AtomicBool = AtomicBool::new(false);
789 if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
790 warn!(
791 "the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?"
792 )
793 }
794
795 if rx.len() > 10_000 {
796 static WARNED_10_000: AtomicBool = AtomicBool::new(false);
797 if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
798 warn!("the client's Event channel has more than 10,000 items!!")
799 }
800
801 if rx.len() > 100_000 {
802 static WARNED_100_000: AtomicBool = AtomicBool::new(false);
803 if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
804 warn!("the client's Event channel has more than 100,000 items!!!")
805 }
806
807 if rx.len() > 1_000_000 {
808 static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
809 if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
810 warn!(
811 "the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
812 )
813 }
814 }
815 }
816 }
817 }
818
819 if let Event::Disconnect(_) = event {
820 debug!(
821 "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
822 bot.entity
823 );
824 let account = bot
825 .get_component::<Account>()
826 .expect("bot is missing required Account component");
827 swarm_tx
828 .send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
829 .unwrap();
830 }
831
832 // we can't handle events here (since we can't copy the handler),
833 // they're handled above in SwarmBuilder::start
834 if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
835 error!(
836 "Error sending event to swarm, aborting event_copying_task for {}: {e}",
837 bot.entity
838 );
839 break;
840 }
841 }
842 debug!(
843 "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
844 bot.entity
845 );
846 }
847
848 /// Add a new account to the swarm, retrying if it couldn't join.
849 ///
850 /// This will run forever until the bot joins or the task is aborted.
851 ///
852 /// This does exponential backoff (though very limited), starting at 5
853 /// seconds and doubling up to 15 seconds.
854 #[deprecated(note = "azalea has auto-reconnect functionality built-in now, use `add` instead")]
855 pub async fn add_and_retry_forever<S: Component + Clone>(
856 &self,
857 account: &Account,
858 state: S,
859 ) -> Client {
860 #[allow(deprecated)]
861 self.add_and_retry_forever_with_opts(account, state, &JoinOpts::default())
862 .await
863 }
864
865 /// Same as [`Self::add_and_retry_forever`], but allow passing custom join
866 /// options.
867 #[deprecated(
868 note = "azalea has auto-reconnect functionality built-in now, use `add_with_opts` instead"
869 )]
870 pub async fn add_and_retry_forever_with_opts<S: Component + Clone>(
871 &self,
872 account: &Account,
873 state: S,
874 opts: &JoinOpts,
875 ) -> Client {
876 let mut disconnects: u32 = 0;
877 loop {
878 match self.add_with_opts(account, state.clone(), opts).await {
879 Ok(bot) => return bot,
880 Err(e) => {
881 disconnects += 1;
882 let delay = (Duration::from_secs(5) * 2u32.pow(disconnects.min(16)))
883 .min(Duration::from_secs(15));
884 let username = account.username.clone();
885
886 error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again.");
887
888 sleep(delay).await;
889 }
890 }
891 }
892 }
893
894 /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
895 /// This will include clients that were disconnected without being removed
896 /// from the ECS.
897 ///
898 /// [`LocalEntity`]: azalea_entity::LocalEntity
899 pub fn client_entities(&self) -> Box<[Entity]> {
900 let mut ecs = self.ecs_lock.lock();
901 let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
902 query.iter(&ecs).collect::<Box<[Entity]>>()
903 }
904}
905
906impl IntoIterator for Swarm {
907 type Item = Client;
908 type IntoIter = std::vec::IntoIter<Self::Item>;
909
910 /// Iterate over the bots in this swarm.
911 ///
912 /// ```rust,no_run
913 /// # use azalea::{prelude::*, swarm::prelude::*};
914 /// #[derive(Component, Clone)]
915 /// # pub struct State;
916 /// # fn example(swarm: Swarm) {
917 /// for bot in swarm {
918 /// let state = bot.component::<State>();
919 /// // ...
920 /// }
921 /// # }
922 /// ```
923 fn into_iter(self) -> Self::IntoIter {
924 let client_entities = self.client_entities();
925
926 client_entities
927 .into_iter()
928 .map(|entity| Client::new(entity, self.ecs_lock.clone()))
929 .collect::<Box<[Client]>>()
930 .into_iter()
931 }
932}
933
934/// This plugin group will add all the default plugins necessary for swarms to
935/// work.
936pub struct DefaultSwarmPlugins;
937
938impl PluginGroup for DefaultSwarmPlugins {
939 fn build(self) -> PluginGroupBuilder {
940 PluginGroupBuilder::start::<Self>()
941 .add(chat::SwarmChatPlugin)
942 .add(events::SwarmPlugin)
943 }
944}
945
946/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
947///
948/// You probably don't need to use this manually since the compiler will infer
949/// it for you.
950#[derive(Resource, Clone, Default)]
951pub struct NoSwarmState;