Skip to main content

azalea/swarm/
mod.rs

1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod builder;
6mod chat;
7mod events;
8pub mod prelude;
9
10use std::sync::{
11    Arc,
12    atomic::{self, AtomicBool},
13};
14
15use azalea_client::{account::Account, client_chat::ChatPacket, join::ConnectOpts};
16use azalea_entity::LocalEntity;
17use azalea_protocol::address::ResolvedAddr;
18use azalea_world::Worlds;
19use bevy_app::{AppExit, PluginGroup, PluginGroupBuilder};
20use bevy_ecs::prelude::*;
21pub use builder::SwarmBuilder;
22use futures::future::BoxFuture;
23use parking_lot::RwLock;
24use tokio::{sync::mpsc, task};
25use tracing::{debug, error, warn};
26
27use crate::{Client, JoinOpts, client_impl::StartClientOpts};
28
29/// A swarm is a way to conveniently control many bots at once, while also
30/// being able to control bots at an individual level when desired.
31///
32/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
33///
34/// Swarms are created from [`SwarmBuilder`].
35///
36/// Clients can be added to the swarm later via [`Swarm::add`], and can be
37/// removed with [`Client::disconnect`].
38#[derive(Clone, Resource)]
39pub struct Swarm {
40    /// A way to directly access the ECS.
41    ///
42    /// This will not work if called within a system, as the ECS is already
43    /// locked.
44    #[doc(alias = "ecs_lock")] // former type name
45    pub ecs: Arc<RwLock<World>>,
46
47    // the address is public and mutable so plugins can change it
48    pub address: Arc<RwLock<ResolvedAddr>>,
49
50    pub worlds: Arc<RwLock<Worlds>>,
51
52    /// This is used internally to make the client handler function work.
53    pub(crate) bots_tx: mpsc::UnboundedSender<(Option<crate::Event>, Client)>,
54    /// This is used internally to make the swarm handler function work.
55    pub(crate) swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
56}
57
58/// An event about something that doesn't have to do with a single bot.
59#[derive(Clone, Debug)]
60#[non_exhaustive]
61pub enum SwarmEvent {
62    /// All the bots in the swarm have successfully joined the server.
63    Login,
64    /// The swarm was created.
65    ///
66    /// This is only fired once, and it's guaranteed to be the first event to
67    /// fire.
68    Init,
69    /// This is fired every Minecraft tick (in the
70    /// [`GameTick`](azalea_core::tick::GameTick) schedule).
71    Tick,
72    /// A bot got disconnected from the server.
73    ///
74    /// If you'd like to implement special auto-reconnect behavior beyond what's
75    /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
76    /// and then call [`Swarm::add_with_opts`] with the account and options
77    /// from this event.
78    ///
79    /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
80    Disconnect(Box<Account>, Box<JoinOpts>),
81    /// At least one bot received a chat message.
82    Chat(ChatPacket),
83}
84
85pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
86pub type BoxSwarmHandleFn<SS, R> =
87    Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
88
89/// Make a bot [`Swarm`].
90///
91/// [`Swarm`]: struct.Swarm.html
92///
93/// # Examples
94/// ```rust,no_run
95/// use azalea::{prelude::*, swarm::prelude::*};
96/// use std::time::Duration;
97///
98/// #[derive(Clone, Component, Default)]
99/// struct State {}
100///
101/// #[derive(Clone, Default, Resource)]
102/// struct SwarmState {}
103///
104/// #[tokio::main]
105/// async fn main() -> AppExit {
106///     let mut accounts = Vec::new();
107///     let mut states = Vec::new();
108///
109///     for i in 0..10 {
110///         accounts.push(Account::offline(&format!("bot{i}")));
111///         states.push(State::default());
112///     }
113///
114///     SwarmBuilder::new()
115///         .add_accounts(accounts.clone())
116///         .set_handler(handle)
117///         .set_swarm_handler(swarm_handle)
118///         .join_delay(Duration::from_millis(1000))
119///         .start("localhost")
120///         .await
121/// }
122///
123/// async fn handle(bot: Client, event: Event, _state: State) -> eyre::Result<()> {
124///     match &event {
125///         _ => {}
126///     }
127///     Ok(())
128/// }
129///
130/// async fn swarm_handle(
131///     mut swarm: Swarm,
132///     event: SwarmEvent,
133///     _state: SwarmState,
134/// ) -> eyre::Result<()> {
135///     match &event {
136///         SwarmEvent::Chat(m) => {
137///             println!("{}", m.message().to_ansi());
138///         }
139///         _ => {}
140///     }
141///     Ok(())
142/// }
143impl Swarm {
144    /// Add a new account to the swarm.
145    ///
146    /// You can remove it later by calling [`Client::disconnect`].
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the server's address could not be resolved.
151    pub async fn add<S: Component + Clone>(&self, account: &Account, state: S) -> Client {
152        self.add_with_opts(account, state, &JoinOpts::default())
153            .await
154    }
155    /// Add a new account to the swarm, using custom options.
156    ///
157    /// This is useful if you want bots in the same swarm to connect to
158    /// different addresses. Usually you'll just want [`Self::add`] though.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the server's address could not be resolved.
163    pub async fn add_with_opts<S: Component + Clone>(
164        &self,
165        account: &Account,
166        state: S,
167        join_opts: &JoinOpts,
168    ) -> Client {
169        debug!(
170            "add_with_opts called for account {} with opts {join_opts:?}",
171            account.username()
172        );
173
174        let mut address = self.address.read().clone();
175        if let Some(custom_server_addr) = join_opts.custom_server_addr.clone() {
176            address.server = custom_server_addr;
177        }
178        if let Some(custom_socket_addr) = join_opts.custom_socket_addr {
179            address.socket = custom_socket_addr;
180        }
181        let server_proxy = join_opts.server_proxy.clone();
182        let sessionserver_proxy = join_opts.sessionserver_proxy.clone();
183
184        let (tx, rx) = mpsc::unbounded_channel();
185
186        let client = Client::start_client(StartClientOpts {
187            ecs_lock: self.ecs.clone(),
188            account: account.clone(),
189            connect_opts: ConnectOpts {
190                address,
191                server_proxy,
192                sessionserver_proxy,
193            },
194            event_sender: Some(tx),
195        })
196        .await;
197        // add the state to the client
198        {
199            let mut ecs = self.ecs.write();
200            ecs.entity_mut(client.entity).insert(state);
201        }
202
203        let cloned_bot = client.clone();
204        let swarm_tx = self.swarm_tx.clone();
205        let bots_tx = self.bots_tx.clone();
206
207        let join_opts = join_opts.clone();
208        task::spawn_local(Self::event_copying_task(
209            rx, swarm_tx, bots_tx, cloned_bot, join_opts,
210        ));
211
212        client
213    }
214
215    /// Copy the events from a client's receiver into bots_tx, until the bot is
216    /// removed from the ECS.
217    async fn event_copying_task(
218        mut rx: mpsc::UnboundedReceiver<crate::Event>,
219        swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
220        bots_tx: mpsc::UnboundedSender<(Option<crate::Event>, Client)>,
221        bot: Client,
222        join_opts: JoinOpts,
223    ) {
224        while let Some(event) = rx.recv().await {
225            if rx.len() > 1_000 {
226                static WARNED_1_000: AtomicBool = AtomicBool::new(false);
227                if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
228                    warn!(
229                        "The client's Event channel has more than 1,000 items! If you don't need it, consider disabling the `packet-event` feature for `azalea`."
230                    )
231                }
232
233                if rx.len() > 10_000 {
234                    static WARNED_10_000: AtomicBool = AtomicBool::new(false);
235                    if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
236                        warn!("The client's Event channel has more than 10,000 items!!")
237                    }
238
239                    if rx.len() > 100_000 {
240                        static WARNED_100_000: AtomicBool = AtomicBool::new(false);
241                        if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
242                            warn!("The client's Event channel has more than 100,000 items!!!")
243                        }
244
245                        if rx.len() > 1_000_000 {
246                            static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
247                            if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
248                                warn!(
249                                    "The client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
250                                )
251                            }
252                        }
253                    }
254                }
255            }
256
257            if let crate::Event::Disconnect(_) = event {
258                debug!(
259                    "Sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
260                    bot.entity
261                );
262                let account = bot.account();
263                swarm_tx
264                    .send(SwarmEvent::Disconnect(
265                        Box::new(account),
266                        Box::new(join_opts.clone()),
267                    ))
268                    .unwrap();
269            }
270
271            // we can't handle events here (since we can't copy the handler),
272            // they're handled above in SwarmBuilder::start
273            if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
274                error!(
275                    "Error sending event to swarm, aborting event_copying_task for {}: {e}",
276                    bot.entity
277                );
278                break;
279            }
280        }
281        debug!(
282            "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
283            bot.entity
284        );
285    }
286
287    /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
288    /// This will include clients that were disconnected without being removed
289    /// from the ECS.
290    ///
291    /// [`LocalEntity`]: azalea_entity::LocalEntity
292    pub fn client_entities(&self) -> Box<[Entity]> {
293        let mut ecs = self.ecs.write();
294        let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
295        query.iter(&ecs).collect::<Box<[Entity]>>()
296    }
297
298    /// End the entire swarm and return from [`SwarmBuilder::start`].
299    ///
300    /// You should typically avoid calling this if you intend on creating the
301    /// swarm again, because creating an entirely new swarm can be a
302    /// relatively expensive process.
303    ///
304    /// If you only want to change the server that the bots are connecting to,
305    /// it may be better to call [`Swarm::add_with_opts`] with a different
306    /// server address.
307    ///
308    /// This is also implemented on [`Client`] as [`Client::exit`].
309    pub fn exit(&self) {
310        self.ecs.write().write_message(AppExit::Success);
311    }
312}
313
314impl IntoIterator for Swarm {
315    type Item = Client;
316    type IntoIter = std::vec::IntoIter<Self::Item>;
317
318    /// Iterate over the bots in this swarm.
319    ///
320    /// ```rust,no_run
321    /// # use azalea::{prelude::*, swarm::prelude::*};
322    /// #[derive(Clone, Component)]
323    /// # pub struct State;
324    /// # fn example(swarm: Swarm) {
325    /// for bot in swarm {
326    ///     let state = bot.component::<State>();
327    ///     // ...
328    /// }
329    /// # }
330    /// ```
331    fn into_iter(self) -> Self::IntoIter {
332        let client_entities = self.client_entities();
333
334        client_entities
335            .into_iter()
336            .map(|entity| Client::new(entity, self.ecs.clone()))
337            .collect::<Box<[Client]>>()
338            .into_iter()
339    }
340}
341
342/// This plugin group will add all the default plugins necessary for swarms to
343/// work.
344pub struct DefaultSwarmPlugins;
345
346impl PluginGroup for DefaultSwarmPlugins {
347    fn build(self) -> PluginGroupBuilder {
348        PluginGroupBuilder::start::<Self>()
349            .add(chat::SwarmChatPlugin)
350            .add(events::SwarmPlugin)
351    }
352}
353
354/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
355///
356/// You probably don't need to use this manually since the compiler will infer
357/// it for you.
358#[derive(Clone, Default, Resource)]
359pub struct NoSwarmState;