azalea_client/plugins/
join.rs1use std::sync::Arc;
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 address::ResolvedAddr,
6 common::client_information::ClientInformation,
7 connect::{Connection, ConnectionError, Proxy},
8 packets::{
9 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
10 handshake::ServerboundIntention,
11 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
12 },
13};
14use azalea_world::World;
15use bevy_app::prelude::*;
16use bevy_ecs::prelude::*;
17use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
18use parking_lot::RwLock;
19use tokio::sync::mpsc;
20use tracing::{debug, warn};
21
22use crate::{
23 LocalPlayerBundle,
24 account::Account,
25 connection::RawConnection,
26 local_player::WorldHolder,
27 packet::login::{InLoginState, SendLoginPacketEvent},
28};
29
30pub struct JoinPlugin;
32impl Plugin for JoinPlugin {
33 fn build(&self, app: &mut App) {
34 app.add_message::<StartJoinServerEvent>()
35 .add_message::<ConnectionFailedEvent>()
36 .add_systems(
37 Update,
38 (
39 handle_start_join_server_event.before(super::login::poll_auth_task),
40 poll_create_connection_task,
41 )
42 .chain(),
43 );
44 }
45}
46
47#[derive(Debug, Message)]
52pub struct StartJoinServerEvent {
53 pub account: Account,
54 pub connect_opts: ConnectOpts,
55
56 pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
58}
59
60#[derive(Clone, Component, Debug)]
65pub struct ConnectOpts {
66 pub address: ResolvedAddr,
67 pub server_proxy: Option<Proxy>,
69 pub sessionserver_proxy: Option<Proxy>,
78}
79
80#[derive(Message)]
87pub struct ConnectionFailedEvent {
88 pub entity: Entity,
89 pub error: Arc<ConnectionError>,
91}
92
93pub fn handle_start_join_server_event(
94 mut commands: Commands,
95 mut events: MessageReader<StartJoinServerEvent>,
96 mut entity_uuid_index: ResMut<EntityUuidIndex>,
97 connection_query: Query<&RawConnection>,
98) {
99 for event in events.read() {
100 let uuid = event.account.uuid();
101 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
102 debug!("Reusing entity {entity:?} for client");
103
104 if let Ok(conn) = connection_query.get(entity)
106 && conn.is_alive()
107 {
108 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
109 warn!(
110 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
111 );
112 let _ = start_join_callback_tx.send(entity);
113 } else {
114 warn!(
115 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
116 );
117 }
118 return;
119 }
120
121 entity
122 } else {
123 let entity = commands.spawn_empty().id();
124 debug!("Created new entity {entity:?} for client");
125 entity_uuid_index.insert(uuid, entity);
127 entity
128 };
129
130 if let Some(start_join_callback) = &event.start_join_callback_tx {
131 let _ = start_join_callback.send(entity);
132 }
133
134 let mut entity_mut = commands.entity(entity);
135
136 entity_mut.insert((
137 event.account.to_owned(),
139 LocalEntity,
142 ClientInformation::default(),
144 event.connect_opts.clone(),
146 ));
150
151 let task_pool = IoTaskPool::get();
152 let connect_opts = event.connect_opts.clone();
153 let task = task_pool.spawn(async_compat::Compat::new(
154 create_conn_and_send_intention_packet(connect_opts),
155 ));
156
157 entity_mut.insert(CreateConnectionTask(task));
158 }
159}
160
161async fn create_conn_and_send_intention_packet(
162 opts: ConnectOpts,
163) -> Result<LoginConn, ConnectionError> {
164 let mut conn = if let Some(proxy) = opts.server_proxy {
165 Connection::new_with_proxy(&opts.address.socket, proxy).await?
166 } else {
167 Connection::new(&opts.address.socket).await?
168 };
169
170 conn.write(ServerboundIntention {
171 protocol_version: PROTOCOL_VERSION,
172 hostname: opts.address.server.host.clone(),
173 port: opts.address.server.port,
174 intention: ClientIntention::Login,
175 })
176 .await?;
177
178 let conn = conn.login();
179
180 Ok(conn)
181}
182
183type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
184
185#[derive(Component)]
186pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
187
188pub fn poll_create_connection_task(
189 mut commands: Commands,
190 mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
191 mut connection_failed_events: MessageWriter<ConnectionFailedEvent>,
192) {
193 for (entity, mut task, account) in query.iter_mut() {
194 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
195 let mut entity_mut = commands.entity(entity);
196 entity_mut.remove::<CreateConnectionTask>();
197 let conn = match poll_res {
198 Ok(conn) => conn,
199 Err(error) => {
200 warn!("failed to create connection: {error}");
201 connection_failed_events.write(ConnectionFailedEvent {
202 entity,
203 error: Arc::new(error),
204 });
205 return;
206 }
207 };
208
209 let (read_conn, write_conn) = conn.into_split();
210 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
211
212 let world = World::default();
213 let world_holder = WorldHolder::new(
214 entity,
215 Arc::new(RwLock::new(world)),
218 );
219
220 entity_mut.insert((
221 LocalPlayerBundle {
223 raw_connection: RawConnection::new(
224 read_conn,
225 write_conn,
226 ConnectionProtocol::Login,
227 ),
228 world_holder,
229 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
230 },
231 InLoginState,
232 ));
233
234 commands.trigger(SendLoginPacketEvent::new(
235 entity,
236 ServerboundHello {
237 name: account.username().to_owned(),
238 profile_id: account.uuid(),
239 },
240 ));
241 }
242 }
243}