azalea_client/plugins/
join.rs1use std::{net::SocketAddr, sync::Arc};
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 ServerAddress,
6 common::client_information::ClientInformation,
7 connect::{Connection, ConnectionError, Proxy},
8 packets::{
9 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
10 handshake::ServerboundIntention,
11 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
12 },
13};
14use azalea_world::Instance;
15use bevy_app::prelude::*;
16use bevy_ecs::prelude::*;
17use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
18use parking_lot::RwLock;
19use tokio::sync::mpsc;
20use tracing::{debug, warn};
21
22use super::events::LocalPlayerEvents;
23use crate::{
24 Account, LocalPlayerBundle,
25 connection::RawConnection,
26 packet::login::{InLoginState, SendLoginPacketEvent},
27};
28
29pub struct JoinPlugin;
31impl Plugin for JoinPlugin {
32 fn build(&self, app: &mut App) {
33 app.add_message::<StartJoinServerEvent>()
34 .add_message::<ConnectionFailedEvent>()
35 .add_systems(
36 Update,
37 (
38 handle_start_join_server_event.before(super::login::poll_auth_task),
39 poll_create_connection_task,
40 )
41 .chain(),
42 );
43 }
44}
45
46#[derive(Message, Debug)]
51pub struct StartJoinServerEvent {
52 pub account: Account,
53 pub connect_opts: ConnectOpts,
54 pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
55
56 pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
58}
59
60#[derive(Debug, Clone, Component)]
66pub struct ConnectOpts {
67 pub address: ServerAddress,
68 pub resolved_address: SocketAddr,
69 pub proxy: Option<Proxy>,
70}
71
72#[derive(Message)]
79pub struct ConnectionFailedEvent {
80 pub entity: Entity,
81 pub error: ConnectionError,
82}
83
84pub fn handle_start_join_server_event(
85 mut commands: Commands,
86 mut events: MessageReader<StartJoinServerEvent>,
87 mut entity_uuid_index: ResMut<EntityUuidIndex>,
88 connection_query: Query<&RawConnection>,
89) {
90 for event in events.read() {
91 let uuid = event.account.uuid_or_offline();
92 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
93 debug!("Reusing entity {entity:?} for client");
94
95 if let Ok(conn) = connection_query.get(entity)
97 && conn.is_alive()
98 {
99 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
100 warn!(
101 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
102 );
103 let _ = start_join_callback_tx.send(entity);
104 } else {
105 warn!(
106 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
107 );
108 }
109 return;
110 }
111
112 entity
113 } else {
114 let entity = commands.spawn_empty().id();
115 debug!("Created new entity {entity:?} for client");
116 entity_uuid_index.insert(uuid, entity);
118 entity
119 };
120
121 if let Some(start_join_callback) = &event.start_join_callback_tx {
122 let _ = start_join_callback.send(entity);
123 }
124
125 let mut entity_mut = commands.entity(entity);
126
127 entity_mut.insert((
128 event.account.to_owned(),
130 LocalEntity,
133 ClientInformation::default(),
135 event.connect_opts.clone(),
137 ));
141
142 if let Some(event_sender) = &event.event_sender {
143 entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
146 }
147
148 let task_pool = IoTaskPool::get();
149 let connect_opts = event.connect_opts.clone();
150 let task = task_pool.spawn(async_compat::Compat::new(
151 create_conn_and_send_intention_packet(connect_opts),
152 ));
153
154 entity_mut.insert(CreateConnectionTask(task));
155 }
156}
157
158async fn create_conn_and_send_intention_packet(
159 opts: ConnectOpts,
160) -> Result<LoginConn, ConnectionError> {
161 let mut conn = if let Some(proxy) = opts.proxy {
162 Connection::new_with_proxy(&opts.resolved_address, proxy).await?
163 } else {
164 Connection::new(&opts.resolved_address).await?
165 };
166
167 conn.write(ServerboundIntention {
168 protocol_version: PROTOCOL_VERSION,
169 hostname: opts.address.host.clone(),
170 port: opts.address.port,
171 intention: ClientIntention::Login,
172 })
173 .await?;
174
175 let conn = conn.login();
176
177 Ok(conn)
178}
179
180type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
181
182#[derive(Component)]
183pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
184
185pub fn poll_create_connection_task(
186 mut commands: Commands,
187 mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
188 mut connection_failed_events: MessageWriter<ConnectionFailedEvent>,
189) {
190 for (entity, mut task, account) in query.iter_mut() {
191 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
192 let mut entity_mut = commands.entity(entity);
193 entity_mut.remove::<CreateConnectionTask>();
194 let conn = match poll_res {
195 Ok(conn) => conn,
196 Err(error) => {
197 warn!("failed to create connection: {error}");
198 connection_failed_events.write(ConnectionFailedEvent { entity, error });
199 return;
200 }
201 };
202
203 let (read_conn, write_conn) = conn.into_split();
204 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
205
206 let instance = Instance::default();
207 let instance_holder = crate::local_player::InstanceHolder::new(
208 entity,
209 Arc::new(RwLock::new(instance)),
212 );
213
214 entity_mut.insert((
215 LocalPlayerBundle {
217 raw_connection: RawConnection::new(
218 read_conn,
219 write_conn,
220 ConnectionProtocol::Login,
221 ),
222 instance_holder,
223 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
224 },
225 InLoginState,
226 ));
227
228 commands.trigger(SendLoginPacketEvent::new(
229 entity,
230 ServerboundHello {
231 name: account.username.clone(),
232 profile_id: account.uuid_or_offline(),
233 },
234 ));
235 }
236 }
237}