azalea_client/plugins/
join.rs1use std::sync::Arc;
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 address::ResolvedAddr,
6 common::client_information::ClientInformation,
7 connect::{Connection, ConnectionError, Proxy},
8 packets::{
9 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
10 handshake::ServerboundIntention,
11 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
12 },
13};
14use azalea_world::Instance;
15use bevy_app::prelude::*;
16use bevy_ecs::prelude::*;
17use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
18use parking_lot::RwLock;
19use tokio::sync::mpsc;
20use tracing::{debug, warn};
21
22use crate::{
23 Account, LocalPlayerBundle,
24 connection::RawConnection,
25 packet::login::{InLoginState, SendLoginPacketEvent},
26};
27
28pub struct JoinPlugin;
30impl Plugin for JoinPlugin {
31 fn build(&self, app: &mut App) {
32 app.add_message::<StartJoinServerEvent>()
33 .add_message::<ConnectionFailedEvent>()
34 .add_systems(
35 Update,
36 (
37 handle_start_join_server_event.before(super::login::poll_auth_task),
38 poll_create_connection_task,
39 )
40 .chain(),
41 );
42 }
43}
44
45#[derive(Debug, Message)]
50pub struct StartJoinServerEvent {
51 pub account: Account,
52 pub connect_opts: ConnectOpts,
53
54 pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
56}
57
58#[derive(Clone, Component, Debug)]
63pub struct ConnectOpts {
64 pub address: ResolvedAddr,
65 pub server_proxy: Option<Proxy>,
67 pub sessionserver_proxy: Option<Proxy>,
76}
77
78#[derive(Message)]
85pub struct ConnectionFailedEvent {
86 pub entity: Entity,
87 pub error: ConnectionError,
88}
89
90pub fn handle_start_join_server_event(
91 mut commands: Commands,
92 mut events: MessageReader<StartJoinServerEvent>,
93 mut entity_uuid_index: ResMut<EntityUuidIndex>,
94 connection_query: Query<&RawConnection>,
95) {
96 for event in events.read() {
97 let uuid = event.account.uuid_or_offline();
98 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
99 debug!("Reusing entity {entity:?} for client");
100
101 if let Ok(conn) = connection_query.get(entity)
103 && conn.is_alive()
104 {
105 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
106 warn!(
107 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
108 );
109 let _ = start_join_callback_tx.send(entity);
110 } else {
111 warn!(
112 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
113 );
114 }
115 return;
116 }
117
118 entity
119 } else {
120 let entity = commands.spawn_empty().id();
121 debug!("Created new entity {entity:?} for client");
122 entity_uuid_index.insert(uuid, entity);
124 entity
125 };
126
127 if let Some(start_join_callback) = &event.start_join_callback_tx {
128 let _ = start_join_callback.send(entity);
129 }
130
131 let mut entity_mut = commands.entity(entity);
132
133 entity_mut.insert((
134 event.account.to_owned(),
136 LocalEntity,
139 ClientInformation::default(),
141 event.connect_opts.clone(),
143 ));
147
148 let task_pool = IoTaskPool::get();
149 let connect_opts = event.connect_opts.clone();
150 let task = task_pool.spawn(async_compat::Compat::new(
151 create_conn_and_send_intention_packet(connect_opts),
152 ));
153
154 entity_mut.insert(CreateConnectionTask(task));
155 }
156}
157
158async fn create_conn_and_send_intention_packet(
159 opts: ConnectOpts,
160) -> Result<LoginConn, ConnectionError> {
161 let mut conn = if let Some(proxy) = opts.server_proxy {
162 Connection::new_with_proxy(&opts.address.socket, proxy).await?
163 } else {
164 Connection::new(&opts.address.socket).await?
165 };
166
167 conn.write(ServerboundIntention {
168 protocol_version: PROTOCOL_VERSION,
169 hostname: opts.address.server.host.clone(),
170 port: opts.address.server.port,
171 intention: ClientIntention::Login,
172 })
173 .await?;
174
175 let conn = conn.login();
176
177 Ok(conn)
178}
179
180type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
181
182#[derive(Component)]
183pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
184
185pub fn poll_create_connection_task(
186 mut commands: Commands,
187 mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
188 mut connection_failed_events: MessageWriter<ConnectionFailedEvent>,
189) {
190 for (entity, mut task, account) in query.iter_mut() {
191 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
192 let mut entity_mut = commands.entity(entity);
193 entity_mut.remove::<CreateConnectionTask>();
194 let conn = match poll_res {
195 Ok(conn) => conn,
196 Err(error) => {
197 warn!("failed to create connection: {error}");
198 connection_failed_events.write(ConnectionFailedEvent { entity, error });
199 return;
200 }
201 };
202
203 let (read_conn, write_conn) = conn.into_split();
204 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
205
206 let instance = Instance::default();
207 let instance_holder = crate::local_player::InstanceHolder::new(
208 entity,
209 Arc::new(RwLock::new(instance)),
212 );
213
214 entity_mut.insert((
215 LocalPlayerBundle {
217 raw_connection: RawConnection::new(
218 read_conn,
219 write_conn,
220 ConnectionProtocol::Login,
221 ),
222 instance_holder,
223 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
224 },
225 InLoginState,
226 ));
227
228 commands.trigger(SendLoginPacketEvent::new(
229 entity,
230 ServerboundHello {
231 name: account.username.clone(),
232 profile_id: account.uuid_or_offline(),
233 },
234 ));
235 }
236 }
237}