azalea_client/plugins/
join.rs1use std::{net::SocketAddr, sync::Arc};
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 ServerAddress,
6 common::client_information::ClientInformation,
7 connect::{Connection, ConnectionError, Proxy},
8 packets::{
9 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
10 handshake::ServerboundIntention,
11 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
12 },
13};
14use azalea_world::Instance;
15use bevy_app::prelude::*;
16use bevy_ecs::prelude::*;
17use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
18use parking_lot::RwLock;
19use tokio::sync::mpsc;
20use tracing::{debug, warn};
21
22use super::events::LocalPlayerEvents;
23use crate::{
24 Account, LocalPlayerBundle,
25 connection::RawConnection,
26 packet::login::{InLoginState, SendLoginPacketEvent},
27};
28
29pub struct JoinPlugin;
31impl Plugin for JoinPlugin {
32 fn build(&self, app: &mut App) {
33 app.add_event::<StartJoinServerEvent>()
34 .add_event::<ConnectionFailedEvent>()
35 .add_systems(
36 Update,
37 (
38 handle_start_join_server_event.before(super::login::poll_auth_task),
39 poll_create_connection_task,
40 )
41 .chain(),
42 );
43 }
44}
45
46#[derive(Event, Debug)]
51pub struct StartJoinServerEvent {
52 pub account: Account,
53 pub connect_opts: ConnectOpts,
54 pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
55
56 pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
58}
59
60#[derive(Debug, Clone, Component)]
65pub struct ConnectOpts {
66 pub address: ServerAddress,
67 pub resolved_address: SocketAddr,
68 pub proxy: Option<Proxy>,
69}
70
71#[derive(Event)]
78pub struct ConnectionFailedEvent {
79 pub entity: Entity,
80 pub error: ConnectionError,
81}
82
83pub fn handle_start_join_server_event(
84 mut commands: Commands,
85 mut events: EventReader<StartJoinServerEvent>,
86 mut entity_uuid_index: ResMut<EntityUuidIndex>,
87 connection_query: Query<&RawConnection>,
88) {
89 for event in events.read() {
90 let uuid = event.account.uuid_or_offline();
91 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
92 debug!("Reusing entity {entity:?} for client");
93
94 if let Ok(conn) = connection_query.get(entity)
96 && conn.is_alive()
97 {
98 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
99 warn!(
100 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
101 );
102 let _ = start_join_callback_tx.send(entity);
103 } else {
104 warn!(
105 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
106 );
107 }
108 return;
109 }
110
111 entity
112 } else {
113 let entity = commands.spawn_empty().id();
114 debug!("Created new entity {entity:?} for client");
115 entity_uuid_index.insert(uuid, entity);
117 entity
118 };
119
120 if let Some(start_join_callback) = &event.start_join_callback_tx {
121 let _ = start_join_callback.send(entity);
122 }
123
124 let mut entity_mut = commands.entity(entity);
125
126 entity_mut.insert((
127 event.account.to_owned(),
129 LocalEntity,
132 ClientInformation::default(),
134 event.connect_opts.clone(),
136 ));
140
141 if let Some(event_sender) = &event.event_sender {
142 entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
145 }
146
147 let task_pool = IoTaskPool::get();
148 let connect_opts = event.connect_opts.clone();
149 let task = task_pool.spawn(async_compat::Compat::new(
150 create_conn_and_send_intention_packet(connect_opts),
151 ));
152
153 entity_mut.insert(CreateConnectionTask(task));
154 }
155}
156
157async fn create_conn_and_send_intention_packet(
158 opts: ConnectOpts,
159) -> Result<LoginConn, ConnectionError> {
160 let mut conn = if let Some(proxy) = opts.proxy {
161 Connection::new_with_proxy(&opts.resolved_address, proxy).await?
162 } else {
163 Connection::new(&opts.resolved_address).await?
164 };
165
166 conn.write(ServerboundIntention {
167 protocol_version: PROTOCOL_VERSION,
168 hostname: opts.address.host.clone(),
169 port: opts.address.port,
170 intention: ClientIntention::Login,
171 })
172 .await?;
173
174 let conn = conn.login();
175
176 Ok(conn)
177}
178
179type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
180
181#[derive(Component)]
182pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
183
184pub fn poll_create_connection_task(
185 mut commands: Commands,
186 mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
187 mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
188) {
189 for (entity, mut task, account) in query.iter_mut() {
190 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
191 let mut entity_mut = commands.entity(entity);
192 entity_mut.remove::<CreateConnectionTask>();
193 let conn = match poll_res {
194 Ok(conn) => conn,
195 Err(error) => {
196 warn!("failed to create connection: {error}");
197 connection_failed_events.write(ConnectionFailedEvent { entity, error });
198 return;
199 }
200 };
201
202 let (read_conn, write_conn) = conn.into_split();
203 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
204
205 let instance = Instance::default();
206 let instance_holder = crate::local_player::InstanceHolder::new(
207 entity,
208 Arc::new(RwLock::new(instance)),
211 );
212
213 entity_mut.insert((
214 LocalPlayerBundle {
216 raw_connection: RawConnection::new(
217 read_conn,
218 write_conn,
219 ConnectionProtocol::Login,
220 ),
221 instance_holder,
222 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
223 },
224 InLoginState,
225 ));
226
227 commands.trigger(SendLoginPacketEvent::new(
228 entity,
229 ServerboundHello {
230 name: account.username.clone(),
231 profile_id: account.uuid_or_offline(),
232 },
233 ));
234 }
235 }
236}