azalea/swarm/mod.rs
1//! Swarms are a way to conveniently control many bots.
2//!
3//! See [`Swarm`] for more information.
4
5mod builder;
6mod chat;
7mod events;
8pub mod prelude;
9
10use std::sync::{
11 Arc,
12 atomic::{self, AtomicBool},
13};
14
15use azalea_client::{Account, Client, Event, StartClientOpts, chat::ChatPacket, join::ConnectOpts};
16use azalea_entity::LocalEntity;
17use azalea_protocol::address::ResolvedAddr;
18use azalea_world::InstanceContainer;
19use bevy_app::{PluginGroup, PluginGroupBuilder};
20use bevy_ecs::prelude::*;
21pub use builder::SwarmBuilder;
22use futures::future::BoxFuture;
23use parking_lot::{Mutex, RwLock};
24use tokio::{sync::mpsc, task};
25use tracing::{debug, error, warn};
26
27use crate::JoinOpts;
28
29/// A swarm is a way to conveniently control many bots at once, while also
30/// being able to control bots at an individual level when desired.
31///
32/// It can safely be cloned, so there should be no need to wrap them in a Mutex.
33///
34/// Swarms are created from [`SwarmBuilder`].
35///
36/// Clients can be added to the swarm later via [`Swarm::add`], and can be
37/// removed with [`Client::disconnect`].
38#[derive(Clone, Resource)]
39pub struct Swarm {
40 pub ecs_lock: Arc<Mutex<World>>,
41
42 // the address is public and mutable so plugins can change it
43 pub address: Arc<RwLock<ResolvedAddr>>,
44
45 pub instance_container: Arc<RwLock<InstanceContainer>>,
46
47 /// This is used internally to make the client handler function work.
48 pub(crate) bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
49 /// This is used internally to make the swarm handler function work.
50 pub(crate) swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
51}
52
53/// An event about something that doesn't have to do with a single bot.
54#[derive(Clone, Debug)]
55#[non_exhaustive]
56pub enum SwarmEvent {
57 /// All the bots in the swarm have successfully joined the server.
58 Login,
59 /// The swarm was created.
60 ///
61 /// This is only fired once, and it's guaranteed to be the first event to
62 /// fire.
63 Init,
64 /// A bot got disconnected from the server.
65 ///
66 /// If you'd like to implement special auto-reconnect behavior beyond what's
67 /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
68 /// and then call [`Swarm::add_with_opts`] with the account and options
69 /// from this event.
70 ///
71 /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
72 Disconnect(Box<Account>, Box<JoinOpts>),
73 /// At least one bot received a chat message.
74 Chat(ChatPacket),
75}
76
77pub type SwarmHandleFn<SS, Fut> = fn(Swarm, SwarmEvent, SS) -> Fut;
78pub type BoxSwarmHandleFn<SS, R> =
79 Box<dyn Fn(Swarm, SwarmEvent, SS) -> BoxFuture<'static, R> + Send + Sync>;
80
81/// Make a bot [`Swarm`].
82///
83/// [`Swarm`]: struct.Swarm.html
84///
85/// # Examples
86/// ```rust,no_run
87/// use azalea::{prelude::*, swarm::prelude::*};
88/// use std::time::Duration;
89///
90/// #[derive(Clone, Component, Default)]
91/// struct State {}
92///
93/// #[derive(Clone, Default, Resource)]
94/// struct SwarmState {}
95///
96/// #[tokio::main]
97/// async fn main() -> AppExit {
98/// let mut accounts = Vec::new();
99/// let mut states = Vec::new();
100///
101/// for i in 0..10 {
102/// accounts.push(Account::offline(&format!("bot{i}")));
103/// states.push(State::default());
104/// }
105///
106/// SwarmBuilder::new()
107/// .add_accounts(accounts.clone())
108/// .set_handler(handle)
109/// .set_swarm_handler(swarm_handle)
110/// .join_delay(Duration::from_millis(1000))
111/// .start("localhost")
112/// .await
113/// }
114///
115/// async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
116/// match &event {
117/// _ => {}
118/// }
119/// Ok(())
120/// }
121///
122/// async fn swarm_handle(
123/// mut swarm: Swarm,
124/// event: SwarmEvent,
125/// _state: SwarmState,
126/// ) -> anyhow::Result<()> {
127/// match &event {
128/// SwarmEvent::Chat(m) => {
129/// println!("{}", m.message().to_ansi());
130/// }
131/// _ => {}
132/// }
133/// Ok(())
134/// }
135impl Swarm {
136 /// Add a new account to the swarm.
137 ///
138 /// You can remove it later by calling [`Client::disconnect`].
139 ///
140 /// # Errors
141 ///
142 /// Returns an error if the server's address could not be resolved.
143 pub async fn add<S: Component + Clone>(&self, account: &Account, state: S) -> Client {
144 self.add_with_opts(account, state, &JoinOpts::default())
145 .await
146 }
147 /// Add a new account to the swarm, using custom options.
148 ///
149 /// This is useful if you want bots in the same swarm to connect to
150 /// different addresses. Usually you'll just want [`Self::add`] though.
151 ///
152 /// # Errors
153 ///
154 /// Returns an error if the server's address could not be resolved.
155 pub async fn add_with_opts<S: Component + Clone>(
156 &self,
157 account: &Account,
158 state: S,
159 join_opts: &JoinOpts,
160 ) -> Client {
161 debug!(
162 "add_with_opts called for account {} with opts {join_opts:?}",
163 account.username
164 );
165
166 let mut address = self.address.read().clone();
167 if let Some(custom_server_addr) = join_opts.custom_server_addr.clone() {
168 address.server = custom_server_addr;
169 }
170 if let Some(custom_socket_addr) = join_opts.custom_socket_addr {
171 address.socket = custom_socket_addr;
172 }
173 let server_proxy = join_opts.server_proxy.clone();
174 let sessionserver_proxy = join_opts.sessionserver_proxy.clone();
175
176 let (tx, rx) = mpsc::unbounded_channel();
177
178 let client = Client::start_client(StartClientOpts {
179 ecs_lock: self.ecs_lock.clone(),
180 account: account.clone(),
181 connect_opts: ConnectOpts {
182 address,
183 server_proxy,
184 sessionserver_proxy,
185 },
186 event_sender: Some(tx),
187 })
188 .await;
189 // add the state to the client
190 {
191 let mut ecs = self.ecs_lock.lock();
192 ecs.entity_mut(client.entity).insert(state);
193 }
194
195 let cloned_bot = client.clone();
196 let swarm_tx = self.swarm_tx.clone();
197 let bots_tx = self.bots_tx.clone();
198
199 let join_opts = join_opts.clone();
200 task::spawn_local(Self::event_copying_task(
201 rx, swarm_tx, bots_tx, cloned_bot, join_opts,
202 ));
203
204 client
205 }
206
207 /// Copy the events from a client's receiver into bots_tx, until the bot is
208 /// removed from the ECS.
209 async fn event_copying_task(
210 mut rx: mpsc::UnboundedReceiver<Event>,
211 swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
212 bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
213 bot: Client,
214 join_opts: JoinOpts,
215 ) {
216 while let Some(event) = rx.recv().await {
217 if rx.len() > 1_000 {
218 static WARNED_1_000: AtomicBool = AtomicBool::new(false);
219 if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
220 warn!(
221 "the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?"
222 )
223 }
224
225 if rx.len() > 10_000 {
226 static WARNED_10_000: AtomicBool = AtomicBool::new(false);
227 if !WARNED_10_000.swap(true, atomic::Ordering::Relaxed) {
228 warn!("the client's Event channel has more than 10,000 items!!")
229 }
230
231 if rx.len() > 100_000 {
232 static WARNED_100_000: AtomicBool = AtomicBool::new(false);
233 if !WARNED_100_000.swap(true, atomic::Ordering::Relaxed) {
234 warn!("the client's Event channel has more than 100,000 items!!!")
235 }
236
237 if rx.len() > 1_000_000 {
238 static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
239 if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
240 warn!(
241 "the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
242 )
243 }
244 }
245 }
246 }
247 }
248
249 if let Event::Disconnect(_) = event {
250 debug!(
251 "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
252 bot.entity
253 );
254 let account = bot
255 .get_component::<Account>()
256 .expect("bot is missing required Account component");
257 swarm_tx
258 .send(SwarmEvent::Disconnect(
259 Box::new(account),
260 Box::new(join_opts.clone()),
261 ))
262 .unwrap();
263 }
264
265 // we can't handle events here (since we can't copy the handler),
266 // they're handled above in SwarmBuilder::start
267 if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
268 error!(
269 "Error sending event to swarm, aborting event_copying_task for {}: {e}",
270 bot.entity
271 );
272 break;
273 }
274 }
275 debug!(
276 "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
277 bot.entity
278 );
279 }
280
281 /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
282 /// This will include clients that were disconnected without being removed
283 /// from the ECS.
284 ///
285 /// [`LocalEntity`]: azalea_entity::LocalEntity
286 pub fn client_entities(&self) -> Box<[Entity]> {
287 let mut ecs = self.ecs_lock.lock();
288 let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
289 query.iter(&ecs).collect::<Box<[Entity]>>()
290 }
291}
292
293impl IntoIterator for Swarm {
294 type Item = Client;
295 type IntoIter = std::vec::IntoIter<Self::Item>;
296
297 /// Iterate over the bots in this swarm.
298 ///
299 /// ```rust,no_run
300 /// # use azalea::{prelude::*, swarm::prelude::*};
301 /// #[derive(Clone, Component)]
302 /// # pub struct State;
303 /// # fn example(swarm: Swarm) {
304 /// for bot in swarm {
305 /// let state = bot.component::<State>();
306 /// // ...
307 /// }
308 /// # }
309 /// ```
310 fn into_iter(self) -> Self::IntoIter {
311 let client_entities = self.client_entities();
312
313 client_entities
314 .into_iter()
315 .map(|entity| Client::new(entity, self.ecs_lock.clone()))
316 .collect::<Box<[Client]>>()
317 .into_iter()
318 }
319}
320
321/// This plugin group will add all the default plugins necessary for swarms to
322/// work.
323pub struct DefaultSwarmPlugins;
324
325impl PluginGroup for DefaultSwarmPlugins {
326 fn build(self) -> PluginGroupBuilder {
327 PluginGroupBuilder::start::<Self>()
328 .add(chat::SwarmChatPlugin)
329 .add(events::SwarmPlugin)
330 }
331}
332
333/// A marker that can be used in place of a SwarmState in [`SwarmBuilder`].
334///
335/// You probably don't need to use this manually since the compiler will infer
336/// it for you.
337#[derive(Clone, Default, Resource)]
338pub struct NoSwarmState;