Skip to main content

azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::{
4    fmt::{self, Debug, Display},
5    io::{self, Cursor},
6    marker::PhantomData,
7    net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12    game_profile::GameProfile,
13    sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18    io::{AsyncWriteExt, BufStream},
19    net::{
20        TcpStream,
21        tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22    },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31    packets::{
32        ProtocolPacket,
33        config::{ClientboundConfigPacket, ServerboundConfigPacket},
34        game::{ClientboundGamePacket, ServerboundGamePacket},
35        handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36        login::{ClientboundLoginPacket, ServerboundLoginPacket},
37        status::{ClientboundStatusPacket, ServerboundStatusPacket},
38    },
39    read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40    write::{serialize_packet, write_raw_packet},
41};
42
43pub struct RawReadConnection {
44    pub read_stream: OwnedReadHalf,
45    pub buffer: Cursor<Vec<u8>>,
46    pub compression_threshold: Option<u32>,
47    pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51    pub write_stream: OwnedWriteHalf,
52    pub compression_threshold: Option<u32>,
53    pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56/// The read half of a connection.
57pub struct ReadConnection<R: ProtocolPacket> {
58    pub raw: RawReadConnection,
59    _reading: PhantomData<R>,
60}
61
62/// The write half of a connection.
63pub struct WriteConnection<W: ProtocolPacket> {
64    pub raw: RawWriteConnection,
65    _writing: PhantomData<W>,
66}
67
68/// A connection that can read and write packets.
69///
70/// # Examples
71///
72/// Join an offline-mode server and go through the handshake.
73/// ```rust,no_run
74/// use azalea_protocol::{
75///     connect::Connection,
76///     packets::{
77///         self, ClientIntention, PROTOCOL_VERSION,
78///         handshake::ServerboundIntention,
79///         login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
80///     },
81///     resolver,
82/// };
83///
84/// #[tokio::main]
85/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
86///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
87///     let mut conn = Connection::new(&resolved_address).await?;
88///
89///     // handshake
90///     conn.write(ServerboundIntention {
91///         protocol_version: PROTOCOL_VERSION,
92///         hostname: resolved_address.ip().to_string(),
93///         port: resolved_address.port(),
94///         intention: ClientIntention::Login,
95///     })
96///     .await?;
97///
98///     let mut conn = conn.login();
99///
100///     // login
101///     conn.write(ServerboundHello {
102///         name: "bot".to_owned(),
103///         profile_id: uuid::Uuid::nil(),
104///     })
105///     .await?;
106///
107///     let (conn, game_profile) = loop {
108///         let packet = conn.read().await?;
109///         match packet {
110///             ClientboundLoginPacket::Hello(p) => {
111///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
112///
113///                 conn.write(ServerboundKey {
114///                     key_bytes: e.encrypted_public_key,
115///                     encrypted_challenge: e.encrypted_challenge,
116///                 })
117///                 .await?;
118///                 conn.set_encryption_key(e.secret_key);
119///             }
120///             ClientboundLoginPacket::LoginCompression(p) => {
121///                 conn.set_compression_threshold(p.compression_threshold);
122///             }
123///             ClientboundLoginPacket::LoginFinished(p) => {
124///                 break (conn.config(), p.game_profile);
125///             }
126///             ClientboundLoginPacket::LoginDisconnect(p) => {
127///                 eprintln!("login disconnect: {}", p.reason);
128///                 return Err("login disconnect".into());
129///             }
130///             ClientboundLoginPacket::CustomQuery(p) => {}
131///             ClientboundLoginPacket::CookieRequest(p) => {
132///                 conn.write(packets::login::ServerboundCookieResponse {
133///                     key: p.key,
134///                     payload: None,
135///                 })
136///                 .await?;
137///             }
138///         }
139///     };
140///
141///     Ok(())
142/// }
143/// ```
144pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145    pub reader: ReadConnection<R>,
146    pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151        read_raw_packet::<_>(
152            &mut self.read_stream,
153            &mut self.buffer,
154            self.compression_threshold,
155            &mut self.dec_cipher,
156        )
157        .await
158    }
159
160    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161        try_read_raw_packet::<_>(
162            &mut self.read_stream,
163            &mut self.buffer,
164            self.compression_threshold,
165            &mut self.dec_cipher,
166        )
167    }
168}
169
170impl RawWriteConnection {
171    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172        if let Err(e) = write_raw_packet(
173            packet,
174            &mut self.write_stream,
175            self.compression_threshold,
176            &mut self.enc_cipher,
177        )
178        .await
179        {
180            // detect broken pipe
181            if e.kind() == io::ErrorKind::BrokenPipe {
182                info!("Broken pipe, shutting down connection.");
183                if let Err(e) = self.shutdown().await {
184                    error!("Couldn't shut down: {}", e);
185                }
186            }
187            return Err(e);
188        }
189        Ok(())
190    }
191
192    /// End the connection.
193    pub async fn shutdown(&mut self) -> io::Result<()> {
194        self.write_stream.shutdown().await
195    }
196}
197
198impl<R> ReadConnection<R>
199where
200    R: ProtocolPacket + Debug,
201{
202    /// Read a packet from the stream.
203    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
204        let raw_packet = self.raw.read().await?;
205        deserialize_packet(&mut Cursor::new(&raw_packet))
206    }
207
208    /// Try to read a packet from the stream, or return Ok(None) if there's no
209    /// packet.
210    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
211        let Some(raw_packet) = self.raw.try_read()? else {
212            return Ok(None);
213        };
214        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
215    }
216}
217impl<W> WriteConnection<W>
218where
219    W: ProtocolPacket + Debug,
220{
221    /// Write a packet to the server.
222    pub async fn write(&mut self, packet: W) -> io::Result<()> {
223        self.raw.write(&serialize_packet(&packet).unwrap()).await
224    }
225
226    /// Write a batch of packets to the server
227    pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
228        let serialized_packets: Vec<u8> = packets
229            .into_iter()
230            .flat_map(|packet| serialize_packet(packet).unwrap())
231            .collect();
232        self.raw.write(&serialized_packets).await
233    }
234
235    /// End the connection.
236    pub async fn shutdown(&mut self) -> io::Result<()> {
237        self.raw.shutdown().await
238    }
239}
240
241impl<R, W> Connection<R, W>
242where
243    R: ProtocolPacket + Debug,
244    W: ProtocolPacket + Debug,
245{
246    /// Read a packet from the other side of the connection.
247    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
248        self.reader.read().await
249    }
250
251    /// Try to read a packet from the other side of the connection, or return
252    /// Ok(None) if there's no packet to read.
253    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
254        self.reader.try_read()
255    }
256
257    /// Write a packet to the other side of the connection.
258    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
259        let packet = packet.into_variant();
260        self.writer.write(packet).await
261    }
262
263    /// Write a batch of packets to the other side of the connection.
264    pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
265        self.writer.write_batch(packets).await
266    }
267
268    /// Split the reader and writer into two objects.
269    ///
270    /// This doesn't allocate.
271    #[must_use]
272    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
273        (self.reader, self.writer)
274    }
275
276    /// Split the reader and writer into the state-agnostic
277    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
278    ///
279    /// This is meant to help with some types of proxies.
280    #[must_use]
281    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
282        (self.reader.raw, self.writer.raw)
283    }
284}
285
286#[derive(Debug, Error)]
287pub enum ConnectionError {
288    #[error("{0}")]
289    Io(#[from] io::Error),
290}
291
292use socks5_impl::protocol::UserKey;
293
294/// An address and authentication method for connecting to a SOCKS5 proxy.
295#[derive(Clone, Debug)]
296pub struct Proxy {
297    pub addr: SocketAddr,
298    pub auth: Option<UserKey>,
299}
300
301impl Proxy {
302    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
303        Self { addr, auth }
304    }
305}
306impl Display for Proxy {
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        write!(f, "socks5://")?;
309        if let Some(auth) = &self.auth {
310            write!(f, "{auth}@")?;
311        }
312        write!(f, "{}", self.addr)
313    }
314}
315
316#[cfg(feature = "online-mode")]
317impl From<Proxy> for reqwest::Proxy {
318    fn from(proxy: Proxy) -> Self {
319        reqwest::Proxy::all(proxy.to_string())
320            .expect("azalea proxies should not fail to parse as reqwest proxies")
321    }
322}
323
324impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
325    /// Create a new connection to the given address.
326    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
327        let stream = TcpStream::connect(address).await?;
328
329        // enable tcp_nodelay
330        stream.set_nodelay(true)?;
331
332        Self::new_from_stream(stream).await
333    }
334
335    /// Create a new connection to the given address and SOCKS5 proxy.
336    ///
337    /// If you're not using a proxy, use [`Self::new`] instead.
338    pub async fn new_with_proxy(
339        address: &SocketAddr,
340        proxy: Proxy,
341    ) -> Result<Self, ConnectionError> {
342        let proxy_stream = TcpStream::connect(proxy.addr).await?;
343        let mut stream = BufStream::new(proxy_stream);
344
345        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
346            .await
347            .map_err(io::Error::other)?;
348
349        Self::new_from_stream(stream.into_inner()).await
350    }
351
352    /// Create a new connection from an existing stream.
353    ///
354    /// Useful if you want to set custom options on the stream. Otherwise, just
355    /// use [`Self::new`].
356    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
357        let (read_stream, write_stream) = stream.into_split();
358
359        Ok(Connection {
360            reader: ReadConnection {
361                raw: RawReadConnection {
362                    read_stream,
363                    buffer: Cursor::new(Vec::new()),
364                    compression_threshold: None,
365                    dec_cipher: None,
366                },
367                _reading: PhantomData,
368            },
369            writer: WriteConnection {
370                raw: RawWriteConnection {
371                    write_stream,
372                    compression_threshold: None,
373                    enc_cipher: None,
374                },
375                _writing: PhantomData,
376            },
377        })
378    }
379
380    /// Change our state from handshake to login.
381    ///
382    /// This is the state that is used for logging in.
383    #[must_use]
384    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
385        Connection::from(self)
386    }
387
388    /// Change our state from handshake to status.
389    ///
390    /// This is the state that is used for pinging the server.
391    #[must_use]
392    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
393        Connection::from(self)
394    }
395}
396
397impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
398    /// Set our compression threshold, i.e. the maximum size that a packet is
399    /// allowed to be without getting compressed.
400    ///
401    /// Setting it to 0 means every packet will be compressed. If you set it to
402    /// less than 0 then compression is disabled.
403    pub fn set_compression_threshold(&mut self, threshold: i32) {
404        // if you pass a threshold of less than 0, compression is disabled
405        if threshold >= 0 {
406            self.reader.raw.compression_threshold = Some(threshold as u32);
407            self.writer.raw.compression_threshold = Some(threshold as u32);
408        } else {
409            self.reader.raw.compression_threshold = None;
410            self.writer.raw.compression_threshold = None;
411        }
412    }
413
414    /// Set the encryption key that is used to encrypt and decrypt packets.
415    ///
416    /// It's the same for both reading and writing.
417    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
418        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
419        self.reader.raw.dec_cipher = Some(dec_cipher);
420        self.writer.raw.enc_cipher = Some(enc_cipher);
421    }
422
423    /// Change our state from login to configuration.
424    ///
425    /// This is the state where the server sends us the registries and the
426    /// resource pack.
427    #[must_use]
428    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
429        Connection::from(self)
430    }
431
432    /// Authenticate with Minecraft's servers, which is required to join
433    /// online-mode servers.
434    ///
435    /// This must happen when you get a `ClientboundLoginPacket::Hello` packet.
436    ///
437    /// # Examples
438    ///
439    /// ```rust,no_run
440    /// use azalea_auth::AuthResult;
441    /// use azalea_protocol::{
442    ///     connect::Connection,
443    ///     packets::login::{ClientboundLoginPacket, ServerboundKey},
444    /// };
445    /// use uuid::Uuid;
446    /// # use azalea_protocol::ServerAddress;
447    ///
448    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
449    /// let AuthResult {
450    ///     access_token,
451    ///     profile,
452    /// } = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
453    ///     .await
454    ///     .expect("Couldn't authenticate");
455    /// #
456    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
457    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
458    ///
459    /// let mut conn = Connection::new(&resolved_address).await?;
460    ///
461    /// // transition to the login state, in a real program we would have done a handshake first
462    /// let mut conn = conn.login();
463    ///
464    /// match conn.read().await? {
465    ///     ClientboundLoginPacket::Hello(p) => {
466    ///         // tell Mojang we're joining the server & enable encryption
467    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
468    ///         conn.authenticate(&access_token, &profile.id, e.secret_key, &p, None)
469    ///             .await?;
470    ///         conn.write(ServerboundKey {
471    ///             key_bytes: e.encrypted_public_key,
472    ///             encrypted_challenge: e.encrypted_challenge,
473    ///         })
474    ///         .await?;
475    ///         conn.set_encryption_key(e.secret_key);
476    ///     }
477    ///     _ => {}
478    /// }
479    /// # Ok(())
480    /// # }
481    /// ```
482    #[cfg(feature = "online-mode")]
483    pub async fn authenticate(
484        &self,
485        access_token: &str,
486        uuid: &Uuid,
487        private_key: [u8; 16],
488        packet: &ClientboundHello,
489        sessionserver_proxy: Option<Proxy>,
490    ) -> Result<(), ClientSessionServerError> {
491        use azalea_auth::sessionserver::{self, SessionServerJoinOpts};
492
493        sessionserver::join(SessionServerJoinOpts {
494            access_token,
495            public_key: &packet.public_key,
496            private_key: &private_key,
497            uuid,
498            server_id: &packet.server_id,
499            proxy: sessionserver_proxy.map(Proxy::into),
500        })
501        .await
502    }
503}
504
505impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
506    /// Change our state from handshake to login.
507    ///
508    /// This is the state that is used while negotiating encryption and
509    /// authenticating with Mojang.
510    #[must_use]
511    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
512        Connection::from(self)
513    }
514
515    /// Change our state from handshake to status.
516    ///
517    /// This is the state that is used for pinging the server.
518    #[must_use]
519    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
520        Connection::from(self)
521    }
522}
523
524impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
525    /// Set our compression threshold, i.e. the maximum size that a packet is
526    /// allowed to be without getting compressed.
527    ///
528    /// If you set it to less than 0 then compression gets disabled.
529    pub fn set_compression_threshold(&mut self, threshold: i32) {
530        // if you pass a threshold of less than 0, compression is disabled
531        if threshold >= 0 {
532            self.reader.raw.compression_threshold = Some(threshold as u32);
533            self.writer.raw.compression_threshold = Some(threshold as u32);
534        } else {
535            self.reader.raw.compression_threshold = None;
536            self.writer.raw.compression_threshold = None;
537        }
538    }
539
540    /// Set the encryption key that is used to encrypt and decrypt packets.
541    ///
542    /// It's the same for both reading and writing.
543    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
544        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
545        self.reader.raw.dec_cipher = Some(dec_cipher);
546        self.writer.raw.enc_cipher = Some(enc_cipher);
547    }
548
549    /// Change our state from login to game.
550    ///
551    /// This is the state that's used when the client is actually in the game.
552    #[must_use]
553    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
554        Connection::from(self)
555    }
556
557    /// Verify connecting clients have authenticated with Minecraft's servers.
558    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
559    /// packet.
560    #[cfg(feature = "online-mode")]
561    pub async fn authenticate(
562        &self,
563        username: &str,
564        public_key: &[u8],
565        private_key: &[u8; 16],
566        ip: Option<&str>,
567    ) -> Result<GameProfile, ServerSessionServerError> {
568        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
569    }
570
571    /// Change our state back to configuration.
572    #[must_use]
573    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
574        Connection::from(self)
575    }
576}
577
578impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
579    /// Change our state from configuration to game.
580    ///
581    /// This is the state that's used when the client is actually in the world.
582    #[must_use]
583    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
584        Connection::from(self)
585    }
586}
587
588impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
589    /// Change our state from configuration to game.
590    ///
591    /// This is the state that's used when the client is actually in the world.
592    #[must_use]
593    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
594        Connection::from(self)
595    }
596}
597
598impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
599    /// Change our state back to configuration.
600    #[must_use]
601    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
602        Connection::from(self)
603    }
604}
605impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
606    /// Change our state back to configuration.
607    #[must_use]
608    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
609        Connection::from(self)
610    }
611}
612
613// rust doesn't let us implement From because allegedly it conflicts with
614// `core`'s "impl<T> From<T> for T" so we do this instead
615impl<R1, W1> Connection<R1, W1>
616where
617    R1: ProtocolPacket + Debug,
618    W1: ProtocolPacket + Debug,
619{
620    /// Creates a `Connection` of a type from a `Connection` of another type.
621    /// Useful for servers or custom packets.
622    #[must_use]
623    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
624    where
625        R2: ProtocolPacket + Debug,
626        W2: ProtocolPacket + Debug,
627    {
628        Connection {
629            reader: ReadConnection {
630                raw: connection.reader.raw,
631                _reading: PhantomData,
632            },
633            writer: WriteConnection {
634                raw: connection.writer.raw,
635                _writing: PhantomData,
636            },
637        }
638    }
639
640    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
641    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
642        let (read_stream, write_stream) = stream.into_split();
643
644        Connection {
645            reader: ReadConnection {
646                raw: RawReadConnection {
647                    read_stream,
648                    buffer: Cursor::new(Vec::new()),
649                    compression_threshold: None,
650                    dec_cipher: None,
651                },
652                _reading: PhantomData,
653            },
654            writer: WriteConnection {
655                raw: RawWriteConnection {
656                    write_stream,
657                    compression_threshold: None,
658                    enc_cipher: None,
659                },
660                _writing: PhantomData,
661            },
662        }
663    }
664
665    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
666    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
667        self.reader
668            .raw
669            .read_stream
670            .reunite(self.writer.raw.write_stream)
671    }
672}