azalea/swarm/mod.rs
1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod builder;
6mod chat;
7mod events;
8pub mod prelude;
9
10use std::sync::{
11 Arc,
12 atomic::{self, AtomicBool},
13};
14
15use azalea_client::{account::Account, client_chat::ChatPacket, join::ConnectOpts};
16use azalea_entity::LocalEntity;
17use azalea_protocol::address::ResolvedAddr;
18use azalea_world::Worlds;
19use bevy_app::{AppExit, PluginGroup, PluginGroupBuilder};
20use bevy_ecs::prelude::*;
21pub use builder::SwarmBuilder;
22use futures::future::BoxFuture;
23use parking_lot::RwLock;
24use tokio::{sync::mpsc, task};
25use tracing::{debug, error, warn};
26
27use crate::{Client, JoinOpts, client_impl::StartClientOpts};
28
29/// A swarm is a way to conveniently control many bots at once, while also
30/// being able to control bots at an individual level when desired.
31///
32/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
33///
34/// Swarms are created from [`SwarmBuilder`].
35///
36/// Clients can be added to the swarm later via [`Swarm::add`], and can be
37/// removed with [`Client::disconnect`].
38#[derive(Clone, Resource)]
39pub struct Swarm {
40 /// A way to directly access the ECS.
41 ///
42 /// This will not work if called within a system, as the ECS is already
43 /// locked.
44 #[doc(alias = "ecs_lock")] // former type name
45 pub ecs: Arc<RwLock<World>>,
46
47 // the address is public and mutable so plugins can change it
48 pub address: Arc<RwLock<ResolvedAddr>>,
49
50 pub worlds: Arc<RwLock<Worlds>>,
51
52 /// This is used internally to make the client handler function work.
53 pub(crate) bots_tx: mpsc::UnboundedSender<(Option<crate::Event>, Client)>,
54 /// This is used internally to make the swarm handler function work.
55 pub(crate) swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
56}
57
58/// An event about something that doesn't have to do with a single bot.
59#[derive(Clone, Debug)]
60#[non_exhaustive]
61pub enum SwarmEvent {
62 /// All the bots in the swarm have successfully joined the server.
63 Login,
64 /// The swarm was created.
65 ///
66 /// This is only fired once, and it's guaranteed to be the first event to
67 /// fire.
68 Init,
69 /// This is fired every Minecraft tick (in the [`GameTick`] schedule).
70 Tick,
71 /// A bot got disconnected from the server.
72 ///
73 /// If you'd like to implement special auto-reconnect behavior beyond what's
74 /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
75 /// and then call [`Swarm::add_with_opts`] with the account and options
76 /// from this event.
77 ///
78 /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
79 Disconnect(Box<Account>, Box<JoinOpts>),
80 /// At least one bot received a chat message.
81 Chat(ChatPacket),
82}
83
84pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
85pub type BoxSwarmHandleFn<SS, R> =
86 Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
87
88/// Make a bot [`Swarm`].
89///
90/// [`Swarm`]: struct.Swarm.html
91///
92/// # Examples
93/// ```rust,no_run
94/// use azalea::{prelude::*, swarm::prelude::*};
95/// use std::time::Duration;
96///
97/// #[derive(Clone, Component, Default)]
98/// struct State {}
99///
100/// #[derive(Clone, Default, Resource)]
101/// struct SwarmState {}
102///
103/// #[tokio::main]
104/// async fn main() -> AppExit {
105/// let mut accounts = Vec::new();
106/// let mut states = Vec::new();
107///
108/// for i in 0..10 {
109/// accounts.push(Account::offline(&format!("bot{i}")));
110/// states.push(State::default());
111/// }
112///
113/// SwarmBuilder::new()
114/// .add_accounts(accounts.clone())
115/// .set_handler(handle)
116/// .set_swarm_handler(swarm_handle)
117/// .join_delay(Duration::from_millis(1000))
118/// .start("localhost")
119/// .await
120/// }
121///
122/// async fn handle(bot: Client, event: Event, _state: State) -> eyre::Result<()> {
123/// match &event {
124/// _ => {}
125/// }
126/// Ok(())
127/// }
128///
129/// async fn swarm_handle(
130/// mut swarm: Swarm,
131/// event: SwarmEvent,
132/// _state: SwarmState,
133/// ) -> eyre::Result<()> {
134/// match &event {
135/// SwarmEvent::Chat(m) => {
136/// println!("{}", m.message().to_ansi());
137/// }
138/// _ => {}
139/// }
140/// Ok(())
141/// }
142impl Swarm {
143 /// Add a new account to the swarm.
144 ///
145 /// You can remove it later by calling [`Client::disconnect`].
146 ///
147 /// # Errors
148 ///
149 /// Returns an error if the server's address could not be resolved.
150 pub async fn add<S: Component + Clone>(&self, account: &Account, state: S) -> Client {
151 self.add_with_opts(account, state, &JoinOpts::default())
152 .await
153 }
154 /// Add a new account to the swarm, using custom options.
155 ///
156 /// This is useful if you want bots in the same swarm to connect to
157 /// different addresses. Usually you'll just want [`Self::add`] though.
158 ///
159 /// # Errors
160 ///
161 /// Returns an error if the server's address could not be resolved.
162 pub async fn add_with_opts<S: Component + Clone>(
163 &self,
164 account: &Account,
165 state: S,
166 join_opts: &JoinOpts,
167 ) -> Client {
168 debug!(
169 "add_with_opts called for account {} with opts {join_opts:?}",
170 account.username()
171 );
172
173 let mut address = self.address.read().clone();
174 if let Some(custom_server_addr) = join_opts.custom_server_addr.clone() {
175 address.server = custom_server_addr;
176 }
177 if let Some(custom_socket_addr) = join_opts.custom_socket_addr {
178 address.socket = custom_socket_addr;
179 }
180 let server_proxy = join_opts.server_proxy.clone();
181 let sessionserver_proxy = join_opts.sessionserver_proxy.clone();
182
183 let (tx, rx) = mpsc::unbounded_channel();
184
185 let client = Client::start_client(StartClientOpts {
186 ecs_lock: self.ecs.clone(),
187 account: account.clone(),
188 connect_opts: ConnectOpts {
189 address,
190 server_proxy,
191 sessionserver_proxy,
192 },
193 event_sender: Some(tx),
194 })
195 .await;
196 // add the state to the client
197 {
198 let mut ecs = self.ecs.write();
199 ecs.entity_mut(client.entity).insert(state);
200 }
201
202 let cloned_bot = client.clone();
203 let swarm_tx = self.swarm_tx.clone();
204 let bots_tx = self.bots_tx.clone();
205
206 let join_opts = join_opts.clone();
207 task::spawn_local(Self::event_copying_task(
208 rx, swarm_tx, bots_tx, cloned_bot, join_opts,
209 ));
210
211 client
212 }
213
214 /// Copy the events from a client's receiver into bots_tx, until the bot is
215 /// removed from the ECS.
216 async fn event_copying_task(
217 mut rx: mpsc::UnboundedReceiver<crate::Event>,
218 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
219 bots_tx: mpsc::UnboundedSender<(Option<crate::Event>, Client)>,
220 bot: Client,
221 join_opts: JoinOpts,
222 ) {
223 while let Some(event) = rx.recv().await {
224 if rx.len() > 1_000 {
225 static WARNED_1_000: AtomicBool = AtomicBool::new(false);
226 if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
227 warn!(
228 "The client's Event channel has more than 1,000 items! If you don't need it, consider disabling the `packet-event` feature for `azalea`."
229 )
230 }
231
232 if rx.len() > 10_000 {
233 static WARNED_10_000: AtomicBool = AtomicBool::new(false);
234 if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
235 warn!("The client's Event channel has more than 10,000 items!!")
236 }
237
238 if rx.len() > 100_000 {
239 static WARNED_100_000: AtomicBool = AtomicBool::new(false);
240 if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
241 warn!("The client's Event channel has more than 100,000 items!!!")
242 }
243
244 if rx.len() > 1_000_000 {
245 static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
246 if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
247 warn!(
248 "The client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
249 )
250 }
251 }
252 }
253 }
254 }
255
256 if let crate::Event::Disconnect(_) = event {
257 debug!(
258 "Sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
259 bot.entity
260 );
261 let account = bot.account();
262 swarm_tx
263 .send(SwarmEvent::Disconnect(
264 Box::new(account),
265 Box::new(join_opts.clone()),
266 ))
267 .unwrap();
268 }
269
270 // we can't handle events here (since we can't copy the handler),
271 // they're handled above in SwarmBuilder::start
272 if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
273 error!(
274 "Error sending event to swarm, aborting event_copying_task for {}: {e}",
275 bot.entity
276 );
277 break;
278 }
279 }
280 debug!(
281 "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
282 bot.entity
283 );
284 }
285
286 /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
287 /// This will include clients that were disconnected without being removed
288 /// from the ECS.
289 ///
290 /// [`LocalEntity`]: azalea_entity::LocalEntity
291 pub fn client_entities(&self) -> Box<[Entity]> {
292 let mut ecs = self.ecs.write();
293 let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
294 query.iter(&ecs).collect::<Box<[Entity]>>()
295 }
296
297 /// End the entire swarm and return from [`SwarmBuilder::start`].
298 ///
299 /// You should typically avoid calling this if you intend on creating the
300 /// swarm again, because creating an entirely new swarm can be a
301 /// relatively expensive process.
302 ///
303 /// If you only want to change the server that the bots are connecting to,
304 /// it may be better to call [`Swarm::add_with_opts`] with a different
305 /// server address.
306 ///
307 /// This is also implemented on [`Client`] as [`Client::exit`].
308 pub fn exit(&self) {
309 self.ecs.write().write_message(AppExit::Success);
310 }
311}
312
313impl IntoIterator for Swarm {
314 type Item = Client;
315 type IntoIter = std::vec::IntoIter<Self::Item>;
316
317 /// Iterate over the bots in this swarm.
318 ///
319 /// ```rust,no_run
320 /// # use azalea::{prelude::*, swarm::prelude::*};
321 /// #[derive(Clone, Component)]
322 /// # pub struct State;
323 /// # fn example(swarm: Swarm) {
324 /// for bot in swarm {
325 /// let state = bot.component::<State>();
326 /// // ...
327 /// }
328 /// # }
329 /// ```
330 fn into_iter(self) -> Self::IntoIter {
331 let client_entities = self.client_entities();
332
333 client_entities
334 .into_iter()
335 .map(|entity| Client::new(entity, self.ecs.clone()))
336 .collect::<Box<[Client]>>()
337 .into_iter()
338 }
339}
340
341/// This plugin group will add all the default plugins necessary for swarms to
342/// work.
343pub struct DefaultSwarmPlugins;
344
345impl PluginGroup for DefaultSwarmPlugins {
346 fn build(self) -> PluginGroupBuilder {
347 PluginGroupBuilder::start::<Self>()
348 .add(chat::SwarmChatPlugin)
349 .add(events::SwarmPlugin)
350 }
351}
352
353/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
354///
355/// You probably don't need to use this manually since the compiler will infer
356/// it for you.
357#[derive(Clone, Default, Resource)]
358pub struct NoSwarmState;