azalea_client/plugins/
join.rs1use std::sync::Arc;
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 address::ResolvedAddr,
6 common::client_information::ClientInformation,
7 connect::{Connection, ConnectionError, Proxy},
8 packets::{
9 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
10 handshake::ServerboundIntention,
11 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
12 },
13};
14use azalea_world::Instance;
15use bevy_app::prelude::*;
16use bevy_ecs::prelude::*;
17use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
18use parking_lot::RwLock;
19use tokio::sync::mpsc;
20use tracing::{debug, warn};
21
22use crate::{
23 LocalPlayerBundle,
24 account::Account,
25 connection::RawConnection,
26 packet::login::{InLoginState, SendLoginPacketEvent},
27};
28
29pub struct JoinPlugin;
31impl Plugin for JoinPlugin {
32 fn build(&self, app: &mut App) {
33 app.add_message::<StartJoinServerEvent>()
34 .add_message::<ConnectionFailedEvent>()
35 .add_systems(
36 Update,
37 (
38 handle_start_join_server_event.before(super::login::poll_auth_task),
39 poll_create_connection_task,
40 )
41 .chain(),
42 );
43 }
44}
45
46#[derive(Debug, Message)]
51pub struct StartJoinServerEvent {
52 pub account: Account,
53 pub connect_opts: ConnectOpts,
54
55 pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
57}
58
59#[derive(Clone, Component, Debug)]
64pub struct ConnectOpts {
65 pub address: ResolvedAddr,
66 pub server_proxy: Option<Proxy>,
68 pub sessionserver_proxy: Option<Proxy>,
77}
78
79#[derive(Message)]
86pub struct ConnectionFailedEvent {
87 pub entity: Entity,
88 pub error: ConnectionError,
89}
90
91pub fn handle_start_join_server_event(
92 mut commands: Commands,
93 mut events: MessageReader<StartJoinServerEvent>,
94 mut entity_uuid_index: ResMut<EntityUuidIndex>,
95 connection_query: Query<&RawConnection>,
96) {
97 for event in events.read() {
98 let uuid = event.account.uuid();
99 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
100 debug!("Reusing entity {entity:?} for client");
101
102 if let Ok(conn) = connection_query.get(entity)
104 && conn.is_alive()
105 {
106 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
107 warn!(
108 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
109 );
110 let _ = start_join_callback_tx.send(entity);
111 } else {
112 warn!(
113 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
114 );
115 }
116 return;
117 }
118
119 entity
120 } else {
121 let entity = commands.spawn_empty().id();
122 debug!("Created new entity {entity:?} for client");
123 entity_uuid_index.insert(uuid, entity);
125 entity
126 };
127
128 if let Some(start_join_callback) = &event.start_join_callback_tx {
129 let _ = start_join_callback.send(entity);
130 }
131
132 let mut entity_mut = commands.entity(entity);
133
134 entity_mut.insert((
135 event.account.to_owned(),
137 LocalEntity,
140 ClientInformation::default(),
142 event.connect_opts.clone(),
144 ));
148
149 let task_pool = IoTaskPool::get();
150 let connect_opts = event.connect_opts.clone();
151 let task = task_pool.spawn(async_compat::Compat::new(
152 create_conn_and_send_intention_packet(connect_opts),
153 ));
154
155 entity_mut.insert(CreateConnectionTask(task));
156 }
157}
158
159async fn create_conn_and_send_intention_packet(
160 opts: ConnectOpts,
161) -> Result<LoginConn, ConnectionError> {
162 let mut conn = if let Some(proxy) = opts.server_proxy {
163 Connection::new_with_proxy(&opts.address.socket, proxy).await?
164 } else {
165 Connection::new(&opts.address.socket).await?
166 };
167
168 conn.write(ServerboundIntention {
169 protocol_version: PROTOCOL_VERSION,
170 hostname: opts.address.server.host.clone(),
171 port: opts.address.server.port,
172 intention: ClientIntention::Login,
173 })
174 .await?;
175
176 let conn = conn.login();
177
178 Ok(conn)
179}
180
181type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
182
183#[derive(Component)]
184pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
185
186pub fn poll_create_connection_task(
187 mut commands: Commands,
188 mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
189 mut connection_failed_events: MessageWriter<ConnectionFailedEvent>,
190) {
191 for (entity, mut task, account) in query.iter_mut() {
192 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
193 let mut entity_mut = commands.entity(entity);
194 entity_mut.remove::<CreateConnectionTask>();
195 let conn = match poll_res {
196 Ok(conn) => conn,
197 Err(error) => {
198 warn!("failed to create connection: {error}");
199 connection_failed_events.write(ConnectionFailedEvent { entity, error });
200 return;
201 }
202 };
203
204 let (read_conn, write_conn) = conn.into_split();
205 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
206
207 let instance = Instance::default();
208 let instance_holder = crate::local_player::InstanceHolder::new(
209 entity,
210 Arc::new(RwLock::new(instance)),
213 );
214
215 entity_mut.insert((
216 LocalPlayerBundle {
218 raw_connection: RawConnection::new(
219 read_conn,
220 write_conn,
221 ConnectionProtocol::Login,
222 ),
223 instance_holder,
224 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
225 },
226 InLoginState,
227 ));
228
229 commands.trigger(SendLoginPacketEvent::new(
230 entity,
231 ServerboundHello {
232 name: account.username().to_owned(),
233 profile_id: account.uuid(),
234 },
235 ));
236 }
237 }
238}