azalea_client/plugins/
join.rs

1use std::{io, net::SocketAddr, sync::Arc};
2
3use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
4use azalea_protocol::{
5    ServerAddress,
6    connect::{Connection, ConnectionError, Proxy},
7    packets::{
8        ClientIntention, ConnectionProtocol, PROTOCOL_VERSION,
9        handshake::ServerboundIntention,
10        login::{ClientboundLoginPacket, ServerboundHello, ServerboundLoginPacket},
11    },
12};
13use azalea_world::Instance;
14use bevy_app::prelude::*;
15use bevy_ecs::prelude::*;
16use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
17use parking_lot::RwLock;
18use tokio::sync::mpsc;
19use tracing::{debug, warn};
20
21use super::events::LocalPlayerEvents;
22use crate::{
23    Account, JoinError, LocalPlayerBundle,
24    connection::RawConnection,
25    packet::login::{InLoginState, SendLoginPacketEvent},
26};
27
28/// A plugin that allows bots to join servers.
29pub struct JoinPlugin;
30impl Plugin for JoinPlugin {
31    fn build(&self, app: &mut App) {
32        app.add_event::<StartJoinServerEvent>()
33            .add_event::<ConnectionFailedEvent>()
34            .add_systems(
35                Update,
36                (
37                    handle_start_join_server_event.before(super::login::poll_auth_task),
38                    poll_create_connection_task,
39                    handle_connection_failed_events,
40                )
41                    .chain(),
42            );
43    }
44}
45
46/// An event to make a client join the server and be added to our swarm.
47///
48/// This won't do anything if a client with the Account UUID is already
49/// connected to the server.
50#[derive(Event, Debug)]
51pub struct StartJoinServerEvent {
52    pub account: Account,
53    pub connect_opts: ConnectOpts,
54    pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
55
56    pub start_join_callback_tx: Option<StartJoinCallback>,
57}
58
59/// Options for how the connection to the server will be made. These are
60/// persisted on reconnects.
61///
62/// This is inserted as a component on clients to make auto-reconnecting work.
63#[derive(Debug, Clone, Component)]
64pub struct ConnectOpts {
65    pub address: ServerAddress,
66    pub resolved_address: SocketAddr,
67    pub proxy: Option<Proxy>,
68}
69
70/// An event that's sent when creating the TCP connection and sending the first
71/// packet fails.
72///
73/// This isn't sent if we're kicked later, see [`DisconnectEvent`].
74///
75/// [`DisconnectEvent`]: crate::disconnect::DisconnectEvent
76#[derive(Event)]
77pub struct ConnectionFailedEvent {
78    pub entity: Entity,
79    pub error: ConnectionError,
80}
81
82// this is mpsc instead of oneshot so it can be cloned (since it's sent in an
83// event)
84#[derive(Component, Debug, Clone)]
85pub struct StartJoinCallback(pub mpsc::UnboundedSender<Result<Entity, JoinError>>);
86
87pub fn handle_start_join_server_event(
88    mut commands: Commands,
89    mut events: EventReader<StartJoinServerEvent>,
90    mut entity_uuid_index: ResMut<EntityUuidIndex>,
91    connection_query: Query<&RawConnection>,
92) {
93    for event in events.read() {
94        let uuid = event.account.uuid_or_offline();
95        let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
96            debug!("Reusing entity {entity:?} for client");
97
98            // check if it's already connected
99            if let Ok(conn) = connection_query.get(entity)
100                && conn.is_alive()
101            {
102                if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
103                    warn!(
104                        "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
105                    );
106                    let _ = start_join_callback_tx.0.send(Ok(entity));
107                } else {
108                    warn!(
109                        "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
110                    );
111                }
112                return;
113            }
114
115            entity
116        } else {
117            let entity = commands.spawn_empty().id();
118            debug!("Created new entity {entity:?} for client");
119            // add to the uuid index
120            entity_uuid_index.insert(uuid, entity);
121            entity
122        };
123
124        let mut entity_mut = commands.entity(entity);
125
126        entity_mut.insert((
127            // add the Account to the entity now so plugins can access it earlier
128            event.account.to_owned(),
129            // localentity is always present for our clients, even if we're not actually logged
130            // in
131            LocalEntity,
132            // ConnectOpts is inserted as a component here
133            event.connect_opts.clone(),
134            // we don't insert InLoginState until we actually create the connection. note that
135            // there's no InHandshakeState component since we switch off of the handshake state
136            // immediately when the connection is created
137        ));
138
139        if let Some(event_sender) = &event.event_sender {
140            // this is optional so we don't leak memory in case the user doesn't want to
141            // handle receiving packets
142            entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
143        }
144        if let Some(start_join_callback) = &event.start_join_callback_tx {
145            entity_mut.insert(start_join_callback.clone());
146        }
147
148        let task_pool = IoTaskPool::get();
149        let connect_opts = event.connect_opts.clone();
150        let task = task_pool.spawn(async_compat::Compat::new(
151            create_conn_and_send_intention_packet(connect_opts),
152        ));
153
154        entity_mut.insert(CreateConnectionTask(task));
155    }
156}
157
158async fn create_conn_and_send_intention_packet(
159    opts: ConnectOpts,
160) -> Result<LoginConn, ConnectionError> {
161    let mut conn = if let Some(proxy) = opts.proxy {
162        Connection::new_with_proxy(&opts.resolved_address, proxy).await?
163    } else {
164        Connection::new(&opts.resolved_address).await?
165    };
166
167    conn.write(ServerboundIntention {
168        protocol_version: PROTOCOL_VERSION,
169        hostname: opts.address.host.clone(),
170        port: opts.address.port,
171        intention: ClientIntention::Login,
172    })
173    .await?;
174
175    let conn = conn.login();
176
177    Ok(conn)
178}
179
180type LoginConn = Connection<ClientboundLoginPacket, ServerboundLoginPacket>;
181
182#[derive(Component)]
183pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
184
185pub fn poll_create_connection_task(
186    mut commands: Commands,
187    mut query: Query<(
188        Entity,
189        &mut CreateConnectionTask,
190        &Account,
191        Option<&StartJoinCallback>,
192    )>,
193    mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
194) {
195    for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
196        if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
197            let mut entity_mut = commands.entity(entity);
198            entity_mut.remove::<CreateConnectionTask>();
199            let conn = match poll_res {
200                Ok(conn) => conn,
201                Err(error) => {
202                    warn!("failed to create connection: {error}");
203                    connection_failed_events.write(ConnectionFailedEvent { entity, error });
204                    return;
205                }
206            };
207
208            let (read_conn, write_conn) = conn.into_split();
209            let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
210
211            let instance = Instance::default();
212            let instance_holder = crate::local_player::InstanceHolder::new(
213                entity,
214                // default to an empty world, it'll be set correctly later when we
215                // get the login packet
216                Arc::new(RwLock::new(instance)),
217            );
218
219            entity_mut.insert((
220                // these stay when we switch to the game state
221                LocalPlayerBundle {
222                    raw_connection: RawConnection::new(
223                        read_conn,
224                        write_conn,
225                        ConnectionProtocol::Login,
226                    ),
227                    client_information: crate::ClientInformation::default(),
228                    instance_holder,
229                    metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
230                },
231                InLoginState,
232            ));
233
234            commands.trigger(SendLoginPacketEvent::new(
235                entity,
236                ServerboundHello {
237                    name: account.username.clone(),
238                    profile_id: account.uuid_or_offline(),
239                },
240            ));
241
242            if let Some(cb) = start_join_callback.take() {
243                let _ = cb.0.send(Ok(entity));
244            }
245        }
246    }
247}
248
249pub fn handle_connection_failed_events(
250    mut events: EventReader<ConnectionFailedEvent>,
251    query: Query<&StartJoinCallback>,
252) {
253    for event in events.read() {
254        let Ok(start_join_callback) = query.get(event.entity) else {
255            // the StartJoinCallback isn't required to be present, so this is fine
256            continue;
257        };
258
259        // io::Error isn't clonable, so we create a new one based on the `kind` and
260        // `to_string`,
261        let ConnectionError::Io(err) = &event.error;
262        let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
263
264        let _ = start_join_callback.0.send(Err(cloned_err.into()));
265    }
266}