1use std::{io, net::SocketAddr, sync::Arc};
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5 ServerAddress,
6 connect::{Connection, ConnectionError, Proxy},
7 packets::{
8 ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
9 handshake::ServerboundIntention,
10 login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
11 },
12};
13use azalea_world::Instance;
14use bevy_app::prelude::*;
15use bevy_ecs::prelude::*;
16use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
17use parking_lot::RwLock;
18use tokio::sync::mpsc;
19use tracing::{debug, warn};
20
21use super::events::LocalPlayerEvents;
22use crate::{
23 Account, JoinError, LocalPlayerBundle,
24 connection::RawConnection,
25 packet::login::{InLoginState, SendLoginPacketEvent},
26};
27
28pub struct JoinPlugin;
30impl Plugin for JoinPlugin {
31 fn build(&self, app: &mut App) {
32 app.add_event::<StartJoinServerEvent>()
33 .add_event::<ConnectionFailedEvent>()
34 .add_systems(
35 Update,
36 (
37 handle_start_join_server_event.before(super::login::poll_auth_task),
38 poll_create_connection_task,
39 handle_connection_failed_events,
40 )
41 .chain(),
42 );
43 }
44}
45
46#[derive(Event, Debug)]
51pub struct StartJoinServerEvent {
52 pub account: Account,
53 pub connect_opts: ConnectOpts,
54 pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
55
56 pub start_join_callback_tx: Option<StartJoinCallback>,
57}
58
59#[derive(Debug, Clone, Component)]
64pub struct ConnectOpts {
65 pub address: ServerAddress,
66 pub resolved_address: SocketAddr,
67 pub proxy: Option<Proxy>,
68}
69
70#[derive(Event)]
77pub struct ConnectionFailedEvent {
78 pub entity: Entity,
79 pub error: ConnectionError,
80}
81
82#[derive(Component, Debug, Clone)]
85pub struct StartJoinCallback(pub mpsc::UnboundedSender<Result<Entity, JoinError>>);
86
87pub fn handle_start_join_server_event(
88 mut commands: Commands,
89 mut events: EventReader<StartJoinServerEvent>,
90 mut entity_uuid_index: ResMut<EntityUuidIndex>,
91 connection_query: Query<&RawConnection>,
92) {
93 for event in events.read() {
94 let uuid = event.account.uuid_or_offline();
95 let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
96 debug!("Reusing entity {entity:?} for client");
97
98 if let Ok(conn) = connection_query.get(entity)
100 && conn.is_alive()
101 {
102 if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
103 warn!(
104 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
105 );
106 let _ = start_join_callback_tx.0.send(Ok(entity));
107 } else {
108 warn!(
109 "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
110 );
111 }
112 return;
113 }
114
115 entity
116 } else {
117 let entity = commands.spawn_empty().id();
118 debug!("Created new entity {entity:?} for client");
119 entity_uuid_index.insert(uuid, entity);
121 entity
122 };
123
124 let mut entity_mut = commands.entity(entity);
125
126 entity_mut.insert((
127 event.account.to_owned(),
129 LocalEntity,
132 event.connect_opts.clone(),
134 ));
138
139 if let Some(event_sender) = &event.event_sender {
140 entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
143 }
144 if let Some(start_join_callback) = &event.start_join_callback_tx {
145 entity_mut.insert(start_join_callback.clone());
146 }
147
148 let task_pool = IoTaskPool::get();
149 let connect_opts = event.connect_opts.clone();
150 let task = task_pool.spawn(async_compat::Compat::new(
151 create_conn_and_send_intention_packet(connect_opts),
152 ));
153
154 entity_mut.insert(CreateConnectionTask(task));
155 }
156}
157
158async fn create_conn_and_send_intention_packet(
159 opts: ConnectOpts,
160) -> Result<LoginConn, ConnectionError> {
161 let mut conn = if let Some(proxy) = opts.proxy {
162 Connection::new_with_proxy(&opts.resolved_address, proxy).await?
163 } else {
164 Connection::new(&opts.resolved_address).await?
165 };
166
167 conn.write(ServerboundIntention {
168 protocol_version: PROTOCOL_VERSION,
169 hostname: opts.address.host.clone(),
170 port: opts.address.port,
171 intention: ClientIntention::Login,
172 })
173 .await?;
174
175 let conn = conn.login();
176
177 Ok(conn)
178}
179
180type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
181
182#[derive(Component)]
183pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
184
185pub fn poll_create_connection_task(
186 mut commands: Commands,
187 mut query: Query<(
188 Entity,
189 &mut CreateConnectionTask,
190 &Account,
191 Option<&StartJoinCallback>,
192 )>,
193 mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
194) {
195 for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
196 if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
197 let mut entity_mut = commands.entity(entity);
198 entity_mut.remove::<CreateConnectionTask>();
199 let conn = match poll_res {
200 Ok(conn) => conn,
201 Err(error) => {
202 warn!("failed to create connection: {error}");
203 connection_failed_events.write(ConnectionFailedEvent { entity, error });
204 return;
205 }
206 };
207
208 let (read_conn, write_conn) = conn.into_split();
209 let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
210
211 let instance = Instance::default();
212 let instance_holder = crate::local_player::InstanceHolder::new(
213 entity,
214 Arc::new(RwLock::new(instance)),
217 );
218
219 entity_mut.insert((
220 LocalPlayerBundle {
222 raw_connection: RawConnection::new(
223 read_conn,
224 write_conn,
225 ConnectionProtocol::Login,
226 ),
227 client_information: crate::ClientInformation::default(),
228 instance_holder,
229 metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
230 },
231 InLoginState,
232 ));
233
234 commands.trigger(SendLoginPacketEvent::new(
235 entity,
236 ServerboundHello {
237 name: account.username.clone(),
238 profile_id: account.uuid_or_offline(),
239 },
240 ));
241
242 if let Some(cb) = start_join_callback.take() {
243 let _ = cb.0.send(Ok(entity));
244 }
245 }
246 }
247}
248
249pub fn handle_connection_failed_events(
250 mut events: EventReader<ConnectionFailedEvent>,
251 query: Query<&StartJoinCallback>,
252) {
253 for event in events.read() {
254 let Ok(start_join_callback) = query.get(event.entity) else {
255 continue;
257 };
258
259 let ConnectionError::Io(err) = &event.error;
262 let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
263
264 let _ = start_join_callback.0.send(Err(cloned_err.into()));
265 }
266}