azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::{
4    fmt::{self, Debug, Display},
5    io::{self, Cursor},
6    marker::PhantomData,
7    net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12    game_profile::GameProfile,
13    sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18    io::{AsyncWriteExt, BufStream},
19    net::{
20        TcpStream,
21        tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22    },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31    packets::{
32        ProtocolPacket,
33        config::{ClientboundConfigPacket, ServerboundConfigPacket},
34        game::{ClientboundGamePacket, ServerboundGamePacket},
35        handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36        login::{ClientboundLoginPacket, ServerboundLoginPacket},
37        status::{ClientboundStatusPacket, ServerboundStatusPacket},
38    },
39    read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40    write::{serialize_packet, write_raw_packet},
41};
42
43pub struct RawReadConnection {
44    pub read_stream: OwnedReadHalf,
45    pub buffer: Cursor<Vec<u8>>,
46    pub compression_threshold: Option<u32>,
47    pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51    pub write_stream: OwnedWriteHalf,
52    pub compression_threshold: Option<u32>,
53    pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56/// The read half of a connection.
57pub struct ReadConnection<R: ProtocolPacket> {
58    pub raw: RawReadConnection,
59    _reading: PhantomData<R>,
60}
61
62/// The write half of a connection.
63pub struct WriteConnection<W: ProtocolPacket> {
64    pub raw: RawWriteConnection,
65    _writing: PhantomData<W>,
66}
67
68/// A connection that can read and write packets.
69///
70/// # Examples
71///
72/// Join an offline-mode server and go through the handshake.
73/// ```rust,no_run
74/// use azalea_protocol::{
75///     connect::Connection,
76///     packets::{
77///         self, ClientIntention, PROTOCOL_VERSION,
78///         handshake::ServerboundIntention,
79///         login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
80///     },
81///     resolver,
82/// };
83///
84/// #[tokio::main]
85/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
86///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
87///     let mut conn = Connection::new(&resolved_address).await?;
88///
89///     // handshake
90///     conn.write(ServerboundIntention {
91///         protocol_version: PROTOCOL_VERSION,
92///         hostname: resolved_address.ip().to_string(),
93///         port: resolved_address.port(),
94///         intention: ClientIntention::Login,
95///     })
96///     .await?;
97///
98///     let mut conn = conn.login();
99///
100///     // login
101///     conn.write(ServerboundHello {
102///         name: "bot".to_owned(),
103///         profile_id: uuid::Uuid::nil(),
104///     })
105///     .await?;
106///
107///     let (conn, game_profile) = loop {
108///         let packet = conn.read().await?;
109///         match packet {
110///             ClientboundLoginPacket::Hello(p) => {
111///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
112///
113///                 conn.write(ServerboundKey {
114///                     key_bytes: e.encrypted_public_key,
115///                     encrypted_challenge: e.encrypted_challenge,
116///                 })
117///                 .await?;
118///                 conn.set_encryption_key(e.secret_key);
119///             }
120///             ClientboundLoginPacket::LoginCompression(p) => {
121///                 conn.set_compression_threshold(p.compression_threshold);
122///             }
123///             ClientboundLoginPacket::LoginFinished(p) => {
124///                 break (conn.config(), p.game_profile);
125///             }
126///             ClientboundLoginPacket::LoginDisconnect(p) => {
127///                 eprintln!("login disconnect: {}", p.reason);
128///                 return Err("login disconnect".into());
129///             }
130///             ClientboundLoginPacket::CustomQuery(p) => {}
131///             ClientboundLoginPacket::CookieRequest(p) => {
132///                 conn.write(packets::login::ServerboundCookieResponse {
133///                     key: p.key,
134///                     payload: None,
135///                 })
136///                 .await?;
137///             }
138///         }
139///     };
140///
141///     Ok(())
142/// }
143/// ```
144pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145    pub reader: ReadConnection<R>,
146    pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151        read_raw_packet::<_>(
152            &mut self.read_stream,
153            &mut self.buffer,
154            self.compression_threshold,
155            &mut self.dec_cipher,
156        )
157        .await
158    }
159
160    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161        try_read_raw_packet::<_>(
162            &mut self.read_stream,
163            &mut self.buffer,
164            self.compression_threshold,
165            &mut self.dec_cipher,
166        )
167    }
168}
169
170impl RawWriteConnection {
171    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172        if let Err(e) = write_raw_packet(
173            packet,
174            &mut self.write_stream,
175            self.compression_threshold,
176            &mut self.enc_cipher,
177        )
178        .await
179        {
180            // detect broken pipe
181            if e.kind() == io::ErrorKind::BrokenPipe {
182                info!("Broken pipe, shutting down connection.");
183                if let Err(e) = self.shutdown().await {
184                    error!("Couldn't shut down: {}", e);
185                }
186            }
187            return Err(e);
188        }
189        Ok(())
190    }
191
192    /// End the connection.
193    pub async fn shutdown(&mut self) -> io::Result<()> {
194        self.write_stream.shutdown().await
195    }
196}
197
198impl<R> ReadConnection<R>
199where
200    R: ProtocolPacket + Debug,
201{
202    /// Read a packet from the stream.
203    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
204        let raw_packet = self.raw.read().await?;
205        deserialize_packet(&mut Cursor::new(&raw_packet))
206    }
207
208    /// Try to read a packet from the stream, or return Ok(None) if there's no
209    /// packet.
210    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
211        let Some(raw_packet) = self.raw.try_read()? else {
212            return Ok(None);
213        };
214        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
215    }
216}
217impl<W> WriteConnection<W>
218where
219    W: ProtocolPacket + Debug,
220{
221    /// Write a packet to the server.
222    pub async fn write(&mut self, packet: W) -> io::Result<()> {
223        self.raw.write(&serialize_packet(&packet).unwrap()).await
224    }
225
226    /// End the connection.
227    pub async fn shutdown(&mut self) -> io::Result<()> {
228        self.raw.shutdown().await
229    }
230}
231
232impl<R, W> Connection<R, W>
233where
234    R: ProtocolPacket + Debug,
235    W: ProtocolPacket + Debug,
236{
237    /// Read a packet from the other side of the connection.
238    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
239        self.reader.read().await
240    }
241
242    /// Try to read a packet from the other side of the connection, or return
243    /// Ok(None) if there's no packet to read.
244    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
245        self.reader.try_read()
246    }
247
248    /// Write a packet to the other side of the connection.
249    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
250        let packet = packet.into_variant();
251        self.writer.write(packet).await
252    }
253
254    /// Split the reader and writer into two objects.
255    ///
256    /// This doesn't allocate.
257    #[must_use]
258    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
259        (self.reader, self.writer)
260    }
261
262    /// Split the reader and writer into the state-agnostic
263    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
264    ///
265    /// This is meant to help with some types of proxies.
266    #[must_use]
267    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
268        (self.reader.raw, self.writer.raw)
269    }
270}
271
272#[derive(Debug, Error)]
273pub enum ConnectionError {
274    #[error("{0}")]
275    Io(#[from] io::Error),
276}
277
278use socks5_impl::protocol::UserKey;
279
280/// An address and authentication method for connecting to a SOCKS5 proxy.
281#[derive(Clone, Debug)]
282pub struct Proxy {
283    pub addr: SocketAddr,
284    pub auth: Option<UserKey>,
285}
286
287impl Proxy {
288    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
289        Self { addr, auth }
290    }
291}
292impl Display for Proxy {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        write!(f, "socks5://")?;
295        if let Some(auth) = &self.auth {
296            write!(f, "{auth}@")?;
297        }
298        write!(f, "{}", self.addr)
299    }
300}
301
302#[cfg(feature = "online-mode")]
303impl From<Proxy> for reqwest::Proxy {
304    fn from(proxy: Proxy) -> Self {
305        reqwest::Proxy::all(proxy.to_string())
306            .expect("azalea proxies should not fail to parse as reqwest proxies")
307    }
308}
309
310impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
311    /// Create a new connection to the given address.
312    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
313        let stream = TcpStream::connect(address).await?;
314
315        // enable tcp_nodelay
316        stream.set_nodelay(true)?;
317
318        Self::new_from_stream(stream).await
319    }
320
321    /// Create a new connection to the given address and SOCKS5 proxy.
322    ///
323    /// If you're not using a proxy, use [`Self::new`] instead.
324    pub async fn new_with_proxy(
325        address: &SocketAddr,
326        proxy: Proxy,
327    ) -> Result<Self, ConnectionError> {
328        let proxy_stream = TcpStream::connect(proxy.addr).await?;
329        let mut stream = BufStream::new(proxy_stream);
330
331        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
332            .await
333            .map_err(io::Error::other)?;
334
335        Self::new_from_stream(stream.into_inner()).await
336    }
337
338    /// Create a new connection from an existing stream.
339    ///
340    /// Useful if you want to set custom options on the stream. Otherwise, just
341    /// use [`Self::new`].
342    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
343        let (read_stream, write_stream) = stream.into_split();
344
345        Ok(Connection {
346            reader: ReadConnection {
347                raw: RawReadConnection {
348                    read_stream,
349                    buffer: Cursor::new(Vec::new()),
350                    compression_threshold: None,
351                    dec_cipher: None,
352                },
353                _reading: PhantomData,
354            },
355            writer: WriteConnection {
356                raw: RawWriteConnection {
357                    write_stream,
358                    compression_threshold: None,
359                    enc_cipher: None,
360                },
361                _writing: PhantomData,
362            },
363        })
364    }
365
366    /// Change our state from handshake to login.
367    ///
368    /// This is the state that is used for logging in.
369    #[must_use]
370    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
371        Connection::from(self)
372    }
373
374    /// Change our state from handshake to status.
375    ///
376    /// This is the state that is used for pinging the server.
377    #[must_use]
378    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
379        Connection::from(self)
380    }
381}
382
383impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
384    /// Set our compression threshold, i.e. the maximum size that a packet is
385    /// allowed to be without getting compressed.
386    ///
387    /// Setting it to 0 means every packet will be compressed. If you set it to
388    /// less than 0 then compression is disabled.
389    pub fn set_compression_threshold(&mut self, threshold: i32) {
390        // if you pass a threshold of less than 0, compression is disabled
391        if threshold >= 0 {
392            self.reader.raw.compression_threshold = Some(threshold as u32);
393            self.writer.raw.compression_threshold = Some(threshold as u32);
394        } else {
395            self.reader.raw.compression_threshold = None;
396            self.writer.raw.compression_threshold = None;
397        }
398    }
399
400    /// Set the encryption key that is used to encrypt and decrypt packets.
401    ///
402    /// It's the same for both reading and writing.
403    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
404        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
405        self.reader.raw.dec_cipher = Some(dec_cipher);
406        self.writer.raw.enc_cipher = Some(enc_cipher);
407    }
408
409    /// Change our state from login to configuration.
410    ///
411    /// This is the state where the server sends us the registries and the
412    /// resource pack.
413    #[must_use]
414    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
415        Connection::from(self)
416    }
417
418    /// Authenticate with Minecraft's servers, which is required to join
419    /// online-mode servers.
420    ///
421    /// This must happen when you get a `ClientboundLoginPacket::Hello` packet.
422    ///
423    /// # Examples
424    ///
425    /// ```rust,no_run
426    /// use azalea_auth::AuthResult;
427    /// use azalea_protocol::{
428    ///     connect::Connection,
429    ///     packets::login::{ClientboundLoginPacket, ServerboundKey},
430    /// };
431    /// use uuid::Uuid;
432    /// # use azalea_protocol::ServerAddress;
433    ///
434    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
435    /// let AuthResult {
436    ///     access_token,
437    ///     profile,
438    /// } = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
439    ///     .await
440    ///     .expect("Couldn't authenticate");
441    /// #
442    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
443    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
444    ///
445    /// let mut conn = Connection::new(&resolved_address).await?;
446    ///
447    /// // transition to the login state, in a real program we would have done a handshake first
448    /// let mut conn = conn.login();
449    ///
450    /// match conn.read().await? {
451    ///     ClientboundLoginPacket::Hello(p) => {
452    ///         // tell Mojang we're joining the server & enable encryption
453    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
454    ///         conn.authenticate(&access_token, &profile.id, e.secret_key, &p, None)
455    ///             .await?;
456    ///         conn.write(ServerboundKey {
457    ///             key_bytes: e.encrypted_public_key,
458    ///             encrypted_challenge: e.encrypted_challenge,
459    ///         })
460    ///         .await?;
461    ///         conn.set_encryption_key(e.secret_key);
462    ///     }
463    ///     _ => {}
464    /// }
465    /// # Ok(())
466    /// # }
467    /// ```
468    #[cfg(feature = "online-mode")]
469    pub async fn authenticate(
470        &self,
471        access_token: &str,
472        uuid: &Uuid,
473        private_key: [u8; 16],
474        packet: &ClientboundHello,
475        sessionserver_proxy: Option<Proxy>,
476    ) -> Result<(), ClientSessionServerError> {
477        use azalea_auth::sessionserver::{self, SessionServerJoinOpts};
478
479        sessionserver::join(SessionServerJoinOpts {
480            access_token,
481            public_key: &packet.public_key,
482            private_key: &private_key,
483            uuid,
484            server_id: &packet.server_id,
485            proxy: sessionserver_proxy.map(Proxy::into),
486        })
487        .await
488    }
489}
490
491impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
492    /// Change our state from handshake to login.
493    ///
494    /// This is the state that is used while negotiating encryption and
495    /// authenticating with Mojang.
496    #[must_use]
497    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
498        Connection::from(self)
499    }
500
501    /// Change our state from handshake to status.
502    ///
503    /// This is the state that is used for pinging the server.
504    #[must_use]
505    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
506        Connection::from(self)
507    }
508}
509
510impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
511    /// Set our compression threshold, i.e. the maximum size that a packet is
512    /// allowed to be without getting compressed.
513    ///
514    /// If you set it to less than 0 then compression gets disabled.
515    pub fn set_compression_threshold(&mut self, threshold: i32) {
516        // if you pass a threshold of less than 0, compression is disabled
517        if threshold >= 0 {
518            self.reader.raw.compression_threshold = Some(threshold as u32);
519            self.writer.raw.compression_threshold = Some(threshold as u32);
520        } else {
521            self.reader.raw.compression_threshold = None;
522            self.writer.raw.compression_threshold = None;
523        }
524    }
525
526    /// Set the encryption key that is used to encrypt and decrypt packets.
527    ///
528    /// It's the same for both reading and writing.
529    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
530        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
531        self.reader.raw.dec_cipher = Some(dec_cipher);
532        self.writer.raw.enc_cipher = Some(enc_cipher);
533    }
534
535    /// Change our state from login to game.
536    ///
537    /// This is the state that's used when the client is actually in the game.
538    #[must_use]
539    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
540        Connection::from(self)
541    }
542
543    /// Verify connecting clients have authenticated with Minecraft's servers.
544    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
545    /// packet.
546    #[cfg(feature = "online-mode")]
547    pub async fn authenticate(
548        &self,
549        username: &str,
550        public_key: &[u8],
551        private_key: &[u8; 16],
552        ip: Option<&str>,
553    ) -> Result<GameProfile, ServerSessionServerError> {
554        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
555    }
556
557    /// Change our state back to configuration.
558    #[must_use]
559    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
560        Connection::from(self)
561    }
562}
563
564impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
565    /// Change our state from configuration to game.
566    ///
567    /// This is the state that's used when the client is actually in the world.
568    #[must_use]
569    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
570        Connection::from(self)
571    }
572}
573
574impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
575    /// Change our state from configuration to game.
576    ///
577    /// This is the state that's used when the client is actually in the world.
578    #[must_use]
579    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
580        Connection::from(self)
581    }
582}
583
584impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
585    /// Change our state back to configuration.
586    #[must_use]
587    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
588        Connection::from(self)
589    }
590}
591impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
592    /// Change our state back to configuration.
593    #[must_use]
594    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
595        Connection::from(self)
596    }
597}
598
599// rust doesn't let us implement From because allegedly it conflicts with
600// `core`'s "impl<T> From<T> for T" so we do this instead
601impl<R1, W1> Connection<R1, W1>
602where
603    R1: ProtocolPacket + Debug,
604    W1: ProtocolPacket + Debug,
605{
606    /// Creates a `Connection` of a type from a `Connection` of another type.
607    /// Useful for servers or custom packets.
608    #[must_use]
609    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
610    where
611        R2: ProtocolPacket + Debug,
612        W2: ProtocolPacket + Debug,
613    {
614        Connection {
615            reader: ReadConnection {
616                raw: connection.reader.raw,
617                _reading: PhantomData,
618            },
619            writer: WriteConnection {
620                raw: connection.writer.raw,
621                _writing: PhantomData,
622            },
623        }
624    }
625
626    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
627    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
628        let (read_stream, write_stream) = stream.into_split();
629
630        Connection {
631            reader: ReadConnection {
632                raw: RawReadConnection {
633                    read_stream,
634                    buffer: Cursor::new(Vec::new()),
635                    compression_threshold: None,
636                    dec_cipher: None,
637                },
638                _reading: PhantomData,
639            },
640            writer: WriteConnection {
641                raw: RawWriteConnection {
642                    write_stream,
643                    compression_threshold: None,
644                    enc_cipher: None,
645                },
646                _writing: PhantomData,
647            },
648        }
649    }
650
651    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
652    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
653        self.reader
654            .raw
655            .read_stream
656            .reunite(self.writer.raw.write_stream)
657    }
658}