1use std::{
2 collections::HashMap,
3 fmt::Debug,
4 mem,
5 net::SocketAddr,
6 sync::Arc,
7 thread,
8 time::{Duration, Instant},
9};
10
11use azalea_auth::game_profile::GameProfile;
12use azalea_core::{
13 data_registry::ResolvableDataRegistry, identifier::Identifier, position::Vec3, tick::GameTick,
14};
15use azalea_entity::{
16 EntityUpdateSystems, PlayerAbilities, Position,
17 dimensions::EntityDimensions,
18 indexing::{EntityIdIndex, EntityUuidIndex},
19 metadata::Health,
20};
21use azalea_physics::local_player::PhysicsState;
22use azalea_protocol::{
23 ServerAddress,
24 connect::Proxy,
25 packets::{Packet, game::ServerboundGamePacket},
26 resolve,
27};
28use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance};
29use bevy_app::{App, AppExit, Plugin, PluginsState, SubApp, Update};
30use bevy_ecs::{
31 message::MessageCursor,
32 prelude::*,
33 schedule::{InternedScheduleLabel, LogLevel, ScheduleBuildSettings},
34};
35use parking_lot::{Mutex, RwLock};
36use simdnbt::owned::NbtCompound;
37use thiserror::Error;
38use tokio::{
39 sync::{
40 mpsc::{self},
41 oneshot,
42 },
43 time,
44};
45use tracing::{info, warn};
46use uuid::Uuid;
47
48use crate::{
49 Account, DefaultPlugins,
50 attack::{self},
51 block_update::QueuedServerBlockUpdates,
52 chunks::ChunkBatchInfo,
53 connection::RawConnection,
54 disconnect::DisconnectEvent,
55 events::Event,
56 interact::BlockStatePredictionHandler,
57 inventory::Inventory,
58 join::{ConnectOpts, StartJoinServerEvent},
59 local_player::{Hunger, InstanceHolder, PermissionLevel, TabList},
60 mining::{self},
61 movement::LastSentLookDirection,
62 packet::game::SendGamePacketEvent,
63 player::{GameProfileComponent, PlayerInfo, retroactively_add_game_profile_component},
64};
65
66#[derive(Clone)]
76pub struct Client {
77 pub entity: Entity,
79
80 pub ecs: Arc<Mutex<World>>,
86}
87
88#[derive(Error, Debug)]
90pub enum JoinError {
91 #[error(transparent)]
92 Resolver(#[from] resolve::ResolveError),
93 #[error("The given address could not be parsed into a ServerAddress")]
94 InvalidAddress,
95}
96
97pub struct StartClientOpts {
98 pub ecs_lock: Arc<Mutex<World>>,
99 pub account: Account,
100 pub connect_opts: ConnectOpts,
101 pub event_sender: Option<mpsc::UnboundedSender<Event>>,
102}
103
104impl StartClientOpts {
105 pub fn new(
106 account: Account,
107 address: ServerAddress,
108 resolved_address: SocketAddr,
109 event_sender: Option<mpsc::UnboundedSender<Event>>,
110 ) -> StartClientOpts {
111 let mut app = App::new();
112 app.add_plugins(DefaultPlugins);
113
114 let (ecs_lock, start_running_systems, _appexit_rx) = start_ecs_runner(app.main_mut());
117 start_running_systems();
118
119 Self {
120 ecs_lock,
121 account,
122 connect_opts: ConnectOpts {
123 address,
124 resolved_address,
125 proxy: None,
126 },
127 event_sender,
128 }
129 }
130
131 pub fn proxy(mut self, proxy: Proxy) -> Self {
132 self.connect_opts.proxy = Some(proxy);
133 self
134 }
135}
136
137impl Client {
138 pub fn new(entity: Entity, ecs: Arc<Mutex<World>>) -> Self {
143 Self {
144 entity,
146
147 ecs,
148 }
149 }
150
151 pub async fn join(
172 account: Account,
173 address: impl TryInto<ServerAddress>,
174 ) -> Result<(Self, mpsc::UnboundedReceiver<Event>), JoinError> {
175 let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
176 let resolved_address = resolve::resolve_address(&address).await?;
177 let (tx, rx) = mpsc::unbounded_channel();
178
179 let client = Self::start_client(StartClientOpts::new(
180 account,
181 address,
182 resolved_address,
183 Some(tx),
184 ))
185 .await;
186 Ok((client, rx))
187 }
188
189 pub async fn join_with_proxy(
190 account: Account,
191 address: impl TryInto<ServerAddress>,
192 proxy: Proxy,
193 ) -> Result<(Self, mpsc::UnboundedReceiver<Event>), JoinError> {
194 let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
195 let resolved_address = resolve::resolve_address(&address).await?;
196 let (tx, rx) = mpsc::unbounded_channel();
197
198 let client = Self::start_client(
199 StartClientOpts::new(account, address, resolved_address, Some(tx)).proxy(proxy),
200 )
201 .await;
202 Ok((client, rx))
203 }
204
205 pub async fn start_client(
208 StartClientOpts {
209 ecs_lock,
210 account,
211 connect_opts,
212 event_sender,
213 }: StartClientOpts,
214 ) -> Self {
215 let (start_join_callback_tx, mut start_join_callback_rx) =
218 mpsc::unbounded_channel::<Entity>();
219
220 ecs_lock.lock().write_message(StartJoinServerEvent {
221 account,
222 connect_opts,
223 event_sender,
224 start_join_callback_tx: Some(start_join_callback_tx),
225 });
226
227 let entity = start_join_callback_rx.recv().await.expect(
228 "start_join_callback should not be dropped before sending a message, this is a bug in Azalea",
229 );
230
231 Client::new(entity, ecs_lock)
232 }
233
234 pub fn write_packet(&self, packet: impl Packet<ServerboundGamePacket>) {
236 let packet = packet.into_variant();
237 self.ecs
238 .lock()
239 .commands()
240 .trigger(SendGamePacketEvent::new(self.entity, packet));
241 }
242
243 pub fn disconnect(&self) {
248 self.ecs.lock().write_message(DisconnectEvent {
249 entity: self.entity,
250 reason: None,
251 });
252 }
253
254 pub fn with_raw_connection<R>(&self, f: impl FnOnce(&RawConnection) -> R) -> R {
255 self.query_self::<&RawConnection, _>(f)
256 }
257 pub fn with_raw_connection_mut<R>(&self, f: impl FnOnce(Mut<'_, RawConnection>) -> R) -> R {
258 self.query_self::<&mut RawConnection, _>(f)
259 }
260
261 pub fn component<T: Component + Clone>(&self) -> T {
285 self.query_self::<&T, _>(|t| t.clone())
286 }
287
288 pub fn get_component<T: Component + Clone>(&self) -> Option<T> {
295 self.query_self::<Option<&T>, _>(|t| t.cloned())
296 }
297
298 pub fn resource<T: Resource + Clone>(&self) -> T {
300 self.ecs.lock().resource::<T>().clone()
301 }
302
303 pub fn map_resource<T: Resource, R>(&self, f: impl FnOnce(&T) -> R) -> R {
305 let ecs = self.ecs.lock();
306 let value = ecs.resource::<T>();
307 f(value)
308 }
309
310 pub fn map_get_resource<T: Resource, R>(&self, f: impl FnOnce(Option<&T>) -> R) -> R {
312 let ecs = self.ecs.lock();
313 let value = ecs.get_resource::<T>();
314 f(value)
315 }
316
317 pub fn world(&self) -> Arc<RwLock<Instance>> {
324 let instance_holder = self.component::<InstanceHolder>();
325 instance_holder.instance.clone()
326 }
327
328 pub fn partial_world(&self) -> Arc<RwLock<PartialInstance>> {
338 let instance_holder = self.component::<InstanceHolder>();
339 instance_holder.partial_instance.clone()
340 }
341
342 pub fn logged_in(&self) -> bool {
344 self.query_self::<Option<&InstanceName>, _>(|ins| ins.is_some())
346 }
347}
348
349impl Client {
350 pub fn position(&self) -> Vec3 {
358 Vec3::from(
359 &self
360 .get_component::<Position>()
361 .expect("the client's position hasn't been initialized yet"),
362 )
363 }
364
365 pub fn dimensions(&self) -> EntityDimensions {
371 self.component::<EntityDimensions>()
372 }
373
374 pub fn eye_position(&self) -> Vec3 {
379 self.query_self::<(&Position, &EntityDimensions), _>(|(pos, dim)| {
380 pos.up(dim.eye_height as f64)
381 })
382 }
383
384 pub fn health(&self) -> f32 {
388 *self.component::<Health>()
389 }
390
391 pub fn hunger(&self) -> Hunger {
396 self.component::<Hunger>().to_owned()
397 }
398
399 pub fn username(&self) -> String {
404 self.profile().name.to_owned()
405 }
406
407 pub fn uuid(&self) -> Uuid {
411 self.profile().uuid
412 }
413
414 pub fn tab_list(&self) -> HashMap<Uuid, PlayerInfo> {
418 (*self.component::<TabList>()).clone()
419 }
420
421 pub fn profile(&self) -> GameProfile {
431 (*self.component::<GameProfileComponent>()).clone()
432 }
433
434 pub fn player_uuid_by_username(&self, username: &str) -> Option<Uuid> {
440 self.tab_list()
441 .values()
442 .find(|player| player.profile.name == username)
443 .map(|player| player.profile.uuid)
444 }
445
446 pub fn entity_by_uuid(&self, uuid: Uuid) -> Option<Entity> {
449 self.map_resource::<EntityUuidIndex, _>(|entity_uuid_index| entity_uuid_index.get(&uuid))
450 }
451
452 pub fn minecraft_entity_by_ecs_entity(&self, entity: Entity) -> Option<MinecraftEntityId> {
454 self.query_self::<&EntityIdIndex, _>(|entity_id_index| {
455 entity_id_index.get_by_ecs_entity(entity)
456 })
457 }
458 pub fn ecs_entity_by_minecraft_entity(&self, entity: MinecraftEntityId) -> Option<Entity> {
460 self.query_self::<&EntityIdIndex, _>(|entity_id_index| {
461 entity_id_index.get_by_minecraft_entity(entity)
462 })
463 }
464
465 pub fn with_registry_holder<R>(
473 &self,
474 f: impl FnOnce(&azalea_core::registry_holder::RegistryHolder) -> R,
475 ) -> R {
476 let instance = self.world();
477 let registries = &instance.read().registries;
478 f(registries)
479 }
480
481 pub fn resolve_registry_name(
487 &self,
488 registry: &impl ResolvableDataRegistry,
489 ) -> Option<Identifier> {
490 self.with_registry_holder(|registries| registry.resolve_name(registries))
491 }
492 pub fn with_resolved_registry<R>(
502 &self,
503 registry: impl ResolvableDataRegistry,
504 f: impl FnOnce(&Identifier, &NbtCompound) -> R,
505 ) -> Option<R> {
506 self.with_registry_holder(|registries| {
507 registry
508 .resolve(registries)
509 .map(|(name, data)| f(name, data))
510 })
511 }
512}
513
514#[derive(Bundle)]
520pub struct LocalPlayerBundle {
521 pub raw_connection: RawConnection,
522 pub instance_holder: InstanceHolder,
523
524 pub metadata: azalea_entity::metadata::PlayerMetadataBundle,
525}
526
527#[derive(Bundle, Default)]
532pub struct JoinedClientBundle {
533 pub physics_state: PhysicsState,
535 pub inventory: Inventory,
536 pub tab_list: TabList,
537 pub block_state_prediction_handler: BlockStatePredictionHandler,
538 pub queued_server_block_updates: QueuedServerBlockUpdates,
539 pub last_sent_direction: LastSentLookDirection,
540 pub abilities: PlayerAbilities,
541 pub permission_level: PermissionLevel,
542 pub chunk_batch_info: ChunkBatchInfo,
543 pub hunger: Hunger,
544
545 pub entity_id_index: EntityIdIndex,
546
547 pub mining: mining::MineBundle,
548 pub attack: attack::AttackBundle,
549
550 pub in_game_state: InGameState,
551}
552
553#[derive(Component, Clone, Debug, Default)]
556pub struct InGameState;
557#[derive(Component, Clone, Debug, Default)]
560pub struct InConfigState;
561
562pub struct AzaleaPlugin;
563impl Plugin for AzaleaPlugin {
564 fn build(&self, app: &mut App) {
565 app.add_systems(
566 Update,
567 (
568 retroactively_add_game_profile_component
570 .after(EntityUpdateSystems::Index)
571 .after(crate::join::handle_start_join_server_event),
572 ),
573 )
574 .init_resource::<InstanceContainer>()
575 .init_resource::<TabList>();
576 }
577}
578
579#[doc(hidden)]
586pub fn start_ecs_runner(
587 app: &mut SubApp,
588) -> (Arc<Mutex<World>>, impl FnOnce(), oneshot::Receiver<AppExit>) {
589 if app.plugins_state() != PluginsState::Cleaned {
592 if app.plugins_state() == PluginsState::Adding {
594 info!("Waiting for plugins to load ...");
595 while app.plugins_state() == PluginsState::Adding {
596 thread::yield_now();
597 }
598 }
599 app.finish();
601 app.cleanup();
602 }
603
604 let ecs = Arc::new(Mutex::new(mem::take(app.world_mut())));
607
608 let ecs_clone = ecs.clone();
609 let outer_schedule_label = *app.update_schedule.as_ref().unwrap();
610
611 let (appexit_tx, appexit_rx) = oneshot::channel();
612 let start_running_systems = move || {
613 tokio::spawn(async move {
614 let appexit = run_schedule_loop(ecs_clone, outer_schedule_label).await;
615 appexit_tx.send(appexit)
616 });
617 };
618
619 (ecs, start_running_systems, appexit_rx)
620}
621
622async fn run_schedule_loop(
627 ecs: Arc<Mutex<World>>,
628 outer_schedule_label: InternedScheduleLabel,
629) -> AppExit {
630 let mut last_update: Option<Instant> = None;
631 let mut last_tick: Option<Instant> = None;
632
633 const UPDATE_DURATION_TARGET: Duration = Duration::from_micros(1_000_000 / 60);
637 const GAME_TICK_DURATION_TARGET: Duration = Duration::from_micros(1_000_000 / 20);
639
640 loop {
641 let now = Instant::now();
643 if let Some(last_update) = last_update {
644 let elapsed = now.duration_since(last_update);
645 if elapsed < UPDATE_DURATION_TARGET {
646 time::sleep(UPDATE_DURATION_TARGET - elapsed).await;
647 }
648 }
649 last_update = Some(now);
650
651 let mut ecs = ecs.lock();
652
653 ecs.run_schedule(outer_schedule_label);
655 if last_tick
656 .map(|last_tick| last_tick.elapsed() > GAME_TICK_DURATION_TARGET)
657 .unwrap_or(true)
658 {
659 if let Some(last_tick) = &mut last_tick {
660 *last_tick += GAME_TICK_DURATION_TARGET;
661
662 if (now - *last_tick) > GAME_TICK_DURATION_TARGET * 10 {
665 warn!(
666 "GameTick is more than 10 ticks behind, skipping ticks so we don't have to burst too much"
667 );
668 *last_tick = now;
669 }
670 } else {
671 last_tick = Some(now);
672 }
673 ecs.run_schedule(GameTick);
674 }
675
676 ecs.clear_trackers();
677 if let Some(exit) = should_exit(&mut ecs) {
678 ecs.clear_all();
680 return exit;
684 }
685 }
686}
687
688fn should_exit(ecs: &mut World) -> Option<AppExit> {
692 let mut reader = MessageCursor::default();
693
694 let events = ecs.get_resource::<Messages<AppExit>>()?;
695 let mut events = reader.read(events);
696
697 if events.len() != 0 {
698 return Some(
699 events
700 .find(|exit| exit.is_error())
701 .cloned()
702 .unwrap_or(AppExit::Success),
703 );
704 }
705
706 None
707}
708
709pub struct AmbiguityLoggerPlugin;
710impl Plugin for AmbiguityLoggerPlugin {
711 fn build(&self, app: &mut App) {
712 app.edit_schedule(Update, |schedule| {
713 schedule.set_build_settings(ScheduleBuildSettings {
714 ambiguity_detection: LogLevel::Warn,
715 ..Default::default()
716 });
717 });
718 app.edit_schedule(GameTick, |schedule| {
719 schedule.set_build_settings(ScheduleBuildSettings {
720 ambiguity_detection: LogLevel::Warn,
721 ..Default::default()
722 });
723 });
724 }
725}