azalea_client/plugins/
join.rs1use std::{net::SocketAddr, sync::Arc};
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 ServerAddress,
6 connect::{Connection, ConnectionError, Proxy},
7 packets::{
8 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
9 handshake::ServerboundIntention,
10 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
11 },
12};
13use azalea_world::Instance;
14use bevy_app::prelude::*;
15use bevy_ecs::prelude::*;
16use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
17use parking_lot::RwLock;
18use tokio::sync::mpsc;
19use tracing::{debug, warn};
20
21use super::events::LocalPlayerEvents;
22use crate::{
23 Account, JoinError, LocalPlayerBundle,
24 connection::RawConnection,
25 packet::login::{InLoginState, SendLoginPacketEvent},
26};
27
28pub struct JoinPlugin;
30impl Plugin for JoinPlugin {
31 fn build(&self, app: &mut App) {
32 app.add_event::<StartJoinServerEvent>().add_systems(
33 Update,
34 (handle_start_join_server_event, poll_create_connection_task),
35 );
36 }
37}
38
39#[derive(Event, Debug)]
40pub struct StartJoinServerEvent {
41 pub account: Account,
42 pub address: ServerAddress,
43 pub resolved_address: SocketAddr,
44 pub proxy: Option<Proxy>,
45 pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
46
47 pub start_join_callback_tx: Option<StartJoinCallback>,
48}
49
50#[derive(Component, Debug, Clone)]
53pub struct StartJoinCallback(pub mpsc::UnboundedSender<Result<Entity, JoinError>>);
54
55pub fn handle_start_join_server_event(
56 mut commands: Commands,
57 mut events: EventReader<StartJoinServerEvent>,
58 mut entity_uuid_index: ResMut<EntityUuidIndex>,
59) {
60 for event in events.read() {
61 let uuid = event.account.uuid_or_offline();
62 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
63 debug!("Reusing entity {entity:?} for client");
64 entity
65 } else {
66 let entity = commands.spawn_empty().id();
67 debug!("Created new entity {entity:?} for client");
68 entity_uuid_index.insert(uuid, entity);
70 entity
71 };
72
73 let mut entity_mut = commands.entity(entity);
74 entity_mut.insert((
75 event.account.to_owned(),
77 LocalEntity,
80 ));
84
85 if let Some(event_sender) = &event.event_sender {
86 entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
89 }
90 if let Some(start_join_callback) = &event.start_join_callback_tx {
91 entity_mut.insert(start_join_callback.clone());
92 }
93
94 let task_pool = IoTaskPool::get();
95 let resolved_addr = event.resolved_address;
96 let address = event.address.clone();
97 let proxy = event.proxy.clone();
98 let task = task_pool.spawn(async_compat::Compat::new(
99 create_conn_and_send_intention_packet(resolved_addr, address, proxy),
100 ));
101
102 entity_mut.insert(CreateConnectionTask(task));
103 }
104}
105
106async fn create_conn_and_send_intention_packet(
107 resolved_addr: SocketAddr,
108 address: ServerAddress,
109 proxy: Option<Proxy>,
110) -> Result<LoginConn, ConnectionError> {
111 let mut conn = if let Some(proxy) = proxy {
112 Connection::new_with_proxy(&resolved_addr, proxy).await?
113 } else {
114 Connection::new(&resolved_addr).await?
115 };
116
117 conn.write(ServerboundIntention {
118 protocol_version: PROTOCOL_VERSION,
119 hostname: address.host.clone(),
120 port: address.port,
121 intention: ClientIntention::Login,
122 })
123 .await?;
124
125 let conn = conn.login();
126
127 Ok(conn)
128}
129
130type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
131
132#[derive(Component)]
133pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
134
135pub fn poll_create_connection_task(
136 mut commands: Commands,
137 mut query: Query<(
138 Entity,
139 &mut CreateConnectionTask,
140 &Account,
141 Option<&StartJoinCallback>,
142 )>,
143) {
144 for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
145 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
146 let mut entity_mut = commands.entity(entity);
147 entity_mut.remove::<CreateConnectionTask>();
148 let conn = match poll_res {
149 Ok(conn) => conn,
150 Err(err) => {
151 warn!("failed to create connection: {err}");
152 if let Some(cb) = start_join_callback.take() {
153 let _ = cb.0.send(Err(err.into()));
154 }
155 return;
156 }
157 };
158
159 let (read_conn, write_conn) = conn.into_split();
160 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
161
162 let instance = Instance::default();
163 let instance_holder = crate::local_player::InstanceHolder::new(
164 entity,
165 Arc::new(RwLock::new(instance)),
168 );
169
170 entity_mut.insert((
171 LocalPlayerBundle {
173 raw_connection: RawConnection::new(
174 read_conn,
175 write_conn,
176 ConnectionProtocol::Login,
177 ),
178 client_information: crate::ClientInformation::default(),
179 instance_holder,
180 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
181 },
182 InLoginState,
183 ));
184
185 commands.trigger(SendLoginPacketEvent::new(
186 entity,
187 ServerboundHello {
188 name: account.username.clone(),
189 profile_id: account.uuid_or_offline(),
190 },
191 ));
192
193 if let Some(cb) = start_join_callback.take() {
194 let _ = cb.0.send(Ok(entity));
195 }
196 }
197 }
198}