azalea/swarm/mod.rs
1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod builder;
6mod chat;
7mod events;
8pub mod prelude;
9
10use std::sync::{
11 Arc,
12 atomic::{self, AtomicBool},
13};
14
15use azalea_client::{account::Account, client_chat::ChatPacket, join::ConnectOpts};
16use azalea_entity::LocalEntity;
17use azalea_protocol::address::ResolvedAddr;
18use azalea_world::Worlds;
19use bevy_app::{AppExit, PluginGroup, PluginGroupBuilder};
20use bevy_ecs::prelude::*;
21pub use builder::SwarmBuilder;
22use futures::future::BoxFuture;
23use parking_lot::RwLock;
24use tokio::{sync::mpsc, task};
25use tracing::{debug, error, warn};
26
27use crate::{Client, JoinOpts, client_impl::StartClientOpts};
28
29/// A swarm is a way to conveniently control many bots at once, while also
30/// being able to control bots at an individual level when desired.
31///
32/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
33///
34/// Swarms are created from [`SwarmBuilder`].
35///
36/// Clients can be added to the swarm later via [`Swarm::add`], and can be
37/// removed with [`Client::disconnect`].
38#[derive(Clone, Resource)]
39pub struct Swarm {
40 /// A way to directly access the ECS.
41 ///
42 /// This will not work if called within a system, as the ECS is already
43 /// locked.
44 #[doc(alias = "ecs_lock")] // former type name
45 pub ecs: Arc<RwLock<World>>,
46
47 // the address is public and mutable so plugins can change it
48 pub address: Arc<RwLock<ResolvedAddr>>,
49
50 pub worlds: Arc<RwLock<Worlds>>,
51
52 /// This is used internally to make the client handler function work.
53 pub(crate) bots_tx: mpsc::UnboundedSender<(Option<crate::Event>, Client)>,
54 /// This is used internally to make the swarm handler function work.
55 pub(crate) swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
56}
57
58/// An event about something that doesn't have to do with a single bot.
59#[derive(Clone, Debug)]
60#[non_exhaustive]
61pub enum SwarmEvent {
62 /// All the bots in the swarm have successfully joined the server.
63 Login,
64 /// The swarm was created.
65 ///
66 /// This is only fired once, and it's guaranteed to be the first event to
67 /// fire.
68 Init,
69 /// This is fired every Minecraft tick (in the
70 /// [`GameTick`](azalea_core::tick::GameTick) schedule).
71 Tick,
72 /// A bot got disconnected from the server.
73 ///
74 /// If you'd like to implement special auto-reconnect behavior beyond what's
75 /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
76 /// and then call [`Swarm::add_with_opts`] with the account and options
77 /// from this event.
78 ///
79 /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
80 Disconnect(Box<Account>, Box<JoinOpts>),
81 /// At least one bot received a chat message.
82 Chat(ChatPacket),
83}
84
85pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
86pub type BoxSwarmHandleFn<SS, R> =
87 Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
88
89/// Make a bot [`Swarm`].
90///
91/// [`Swarm`]: struct.Swarm.html
92///
93/// # Examples
94/// ```rust,no_run
95/// use azalea::{prelude::*, swarm::prelude::*};
96/// use std::time::Duration;
97///
98/// #[derive(Clone, Component, Default)]
99/// struct State {}
100///
101/// #[derive(Clone, Default, Resource)]
102/// struct SwarmState {}
103///
104/// #[tokio::main]
105/// async fn main() -> AppExit {
106/// let mut accounts = Vec::new();
107/// let mut states = Vec::new();
108///
109/// for i in 0..10 {
110/// accounts.push(Account::offline(&format!("bot{i}")));
111/// states.push(State::default());
112/// }
113///
114/// SwarmBuilder::new()
115/// .add_accounts(accounts.clone())
116/// .set_handler(handle)
117/// .set_swarm_handler(swarm_handle)
118/// .join_delay(Duration::from_millis(1000))
119/// .start("localhost")
120/// .await
121/// }
122///
123/// async fn handle(bot: Client, event: Event, _state: State) -> eyre::Result<()> {
124/// match &event {
125/// _ => {}
126/// }
127/// Ok(())
128/// }
129///
130/// async fn swarm_handle(
131/// mut swarm: Swarm,
132/// event: SwarmEvent,
133/// _state: SwarmState,
134/// ) -> eyre::Result<()> {
135/// match &event {
136/// SwarmEvent::Chat(m) => {
137/// println!("{}", m.message().to_ansi());
138/// }
139/// _ => {}
140/// }
141/// Ok(())
142/// }
143impl Swarm {
144 /// Add a new account to the swarm.
145 ///
146 /// You can remove it later by calling [`Client::disconnect`].
147 ///
148 /// # Errors
149 ///
150 /// Returns an error if the server's address could not be resolved.
151 pub async fn add<S: Component + Clone>(&self, account: &Account, state: S) -> Client {
152 self.add_with_opts(account, state, &JoinOpts::default())
153 .await
154 }
155 /// Add a new account to the swarm, using custom options.
156 ///
157 /// This is useful if you want bots in the same swarm to connect to
158 /// different addresses. Usually you'll just want [`Self::add`] though.
159 ///
160 /// # Errors
161 ///
162 /// Returns an error if the server's address could not be resolved.
163 pub async fn add_with_opts<S: Component + Clone>(
164 &self,
165 account: &Account,
166 state: S,
167 join_opts: &JoinOpts,
168 ) -> Client {
169 debug!(
170 "add_with_opts called for account {} with opts {join_opts:?}",
171 account.username()
172 );
173
174 let mut address = self.address.read().clone();
175 if let Some(custom_server_addr) = join_opts.custom_server_addr.clone() {
176 address.server = custom_server_addr;
177 }
178 if let Some(custom_socket_addr) = join_opts.custom_socket_addr {
179 address.socket = custom_socket_addr;
180 }
181 let server_proxy = join_opts.server_proxy.clone();
182 let sessionserver_proxy = join_opts.sessionserver_proxy.clone();
183
184 let (tx, rx) = mpsc::unbounded_channel();
185
186 let client = Client::start_client(StartClientOpts {
187 ecs_lock: self.ecs.clone(),
188 account: account.clone(),
189 connect_opts: ConnectOpts {
190 address,
191 server_proxy,
192 sessionserver_proxy,
193 },
194 event_sender: Some(tx),
195 })
196 .await;
197 // add the state to the client
198 {
199 let mut ecs = self.ecs.write();
200 ecs.entity_mut(client.entity).insert(state);
201 }
202
203 let cloned_bot = client.clone();
204 let swarm_tx = self.swarm_tx.clone();
205 let bots_tx = self.bots_tx.clone();
206
207 let join_opts = join_opts.clone();
208 task::spawn_local(Self::event_copying_task(
209 rx, swarm_tx, bots_tx, cloned_bot, join_opts,
210 ));
211
212 client
213 }
214
215 /// Copy the events from a client's receiver into bots_tx, until the bot is
216 /// removed from the ECS.
217 async fn event_copying_task(
218 mut rx: mpsc::UnboundedReceiver<crate::Event>,
219 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
220 bots_tx: mpsc::UnboundedSender<(Option<crate::Event>, Client)>,
221 bot: Client,
222 join_opts: JoinOpts,
223 ) {
224 while let Some(event) = rx.recv().await {
225 if rx.len() > 1_000 {
226 static WARNED_1_000: AtomicBool = AtomicBool::new(false);
227 if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
228 warn!(
229 "The client's Event channel has more than 1,000 items! If you don't need it, consider disabling the `packet-event` feature for `azalea`."
230 )
231 }
232
233 if rx.len() > 10_000 {
234 static WARNED_10_000: AtomicBool = AtomicBool::new(false);
235 if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
236 warn!("The client's Event channel has more than 10,000 items!!")
237 }
238
239 if rx.len() > 100_000 {
240 static WARNED_100_000: AtomicBool = AtomicBool::new(false);
241 if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
242 warn!("The client's Event channel has more than 100,000 items!!!")
243 }
244
245 if rx.len() > 1_000_000 {
246 static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
247 if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
248 warn!(
249 "The client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
250 )
251 }
252 }
253 }
254 }
255 }
256
257 if let crate::Event::Disconnect(_) = event {
258 debug!(
259 "Sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
260 bot.entity
261 );
262 let account = bot.account();
263 swarm_tx
264 .send(SwarmEvent::Disconnect(
265 Box::new(account),
266 Box::new(join_opts.clone()),
267 ))
268 .unwrap();
269 }
270
271 // we can't handle events here (since we can't copy the handler),
272 // they're handled above in SwarmBuilder::start
273 if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
274 error!(
275 "Error sending event to swarm, aborting event_copying_task for {}: {e}",
276 bot.entity
277 );
278 break;
279 }
280 }
281 debug!(
282 "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
283 bot.entity
284 );
285 }
286
287 /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
288 /// This will include clients that were disconnected without being removed
289 /// from the ECS.
290 ///
291 /// [`LocalEntity`]: azalea_entity::LocalEntity
292 pub fn client_entities(&self) -> Box<[Entity]> {
293 let mut ecs = self.ecs.write();
294 let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
295 query.iter(&ecs).collect::<Box<[Entity]>>()
296 }
297
298 /// End the entire swarm and return from [`SwarmBuilder::start`].
299 ///
300 /// You should typically avoid calling this if you intend on creating the
301 /// swarm again, because creating an entirely new swarm can be a
302 /// relatively expensive process.
303 ///
304 /// If you only want to change the server that the bots are connecting to,
305 /// it may be better to call [`Swarm::add_with_opts`] with a different
306 /// server address.
307 ///
308 /// This is also implemented on [`Client`] as [`Client::exit`].
309 pub fn exit(&self) {
310 self.ecs.write().write_message(AppExit::Success);
311 }
312}
313
314impl IntoIterator for Swarm {
315 type Item = Client;
316 type IntoIter = std::vec::IntoIter<Self::Item>;
317
318 /// Iterate over the bots in this swarm.
319 ///
320 /// ```rust,no_run
321 /// # use azalea::{prelude::*, swarm::prelude::*};
322 /// #[derive(Clone, Component)]
323 /// # pub struct State;
324 /// # fn example(swarm: Swarm) {
325 /// for bot in swarm {
326 /// let state = bot.component::<State>();
327 /// // ...
328 /// }
329 /// # }
330 /// ```
331 fn into_iter(self) -> Self::IntoIter {
332 let client_entities = self.client_entities();
333
334 client_entities
335 .into_iter()
336 .map(|entity| Client::new(entity, self.ecs.clone()))
337 .collect::<Box<[Client]>>()
338 .into_iter()
339 }
340}
341
342/// This plugin group will add all the default plugins necessary for swarms to
343/// work.
344pub struct DefaultSwarmPlugins;
345
346impl PluginGroup for DefaultSwarmPlugins {
347 fn build(self) -> PluginGroupBuilder {
348 PluginGroupBuilder::start::<Self>()
349 .add(chat::SwarmChatPlugin)
350 .add(events::SwarmPlugin)
351 }
352}
353
354/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
355///
356/// You probably don't need to use this manually since the compiler will infer
357/// it for you.
358#[derive(Clone, Default, Resource)]
359pub struct NoSwarmState;