azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::{
4    fmt::{self, Debug, Display},
5    io::{self, Cursor},
6    marker::PhantomData,
7    net::SocketAddr,
8};
9
10use azalea_auth::{
11    game_profile::GameProfile,
12    sessionserver::{ClientSessionServerError, ServerSessionServerError},
13};
14use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
15use thiserror::Error;
16use tokio::{
17    io::{AsyncWriteExt, BufStream},
18    net::{
19        TcpStream,
20        tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
21    },
22};
23use tracing::{error, info};
24use uuid::Uuid;
25
26use crate::{
27    packets::{
28        ProtocolPacket,
29        config::{ClientboundConfigPacket, ServerboundConfigPacket},
30        game::{ClientboundGamePacket, ServerboundGamePacket},
31        handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
32        login::{ClientboundLoginPacket, ServerboundLoginPacket, c_hello::ClientboundHello},
33        status::{ClientboundStatusPacket, ServerboundStatusPacket},
34    },
35    read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
36    write::{serialize_packet, write_raw_packet},
37};
38
39pub struct RawReadConnection {
40    pub read_stream: OwnedReadHalf,
41    pub buffer: Cursor<Vec<u8>>,
42    pub compression_threshold: Option<u32>,
43    pub dec_cipher: Option<Aes128CfbDec>,
44}
45
46pub struct RawWriteConnection {
47    pub write_stream: OwnedWriteHalf,
48    pub compression_threshold: Option<u32>,
49    pub enc_cipher: Option<Aes128CfbEnc>,
50}
51
52/// The read half of a connection.
53pub struct ReadConnection<R: ProtocolPacket> {
54    pub raw: RawReadConnection,
55    _reading: PhantomData<R>,
56}
57
58/// The write half of a connection.
59pub struct WriteConnection<W: ProtocolPacket> {
60    pub raw: RawWriteConnection,
61    _writing: PhantomData<W>,
62}
63
64/// A connection that can read and write packets.
65///
66/// # Examples
67///
68/// Join an offline-mode server and go through the handshake.
69/// ```rust,no_run
70/// use azalea_protocol::{
71///     connect::Connection,
72///     packets::{
73///         self, ClientIntention, PROTOCOL_VERSION,
74///         handshake::ServerboundIntention,
75///         login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
76///     },
77///     resolver,
78/// };
79///
80/// #[tokio::main]
81/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
82///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
83///     let mut conn = Connection::new(&resolved_address).await?;
84///
85///     // handshake
86///     conn.write(ServerboundIntention {
87///         protocol_version: PROTOCOL_VERSION,
88///         hostname: resolved_address.ip().to_string(),
89///         port: resolved_address.port(),
90///         intention: ClientIntention::Login,
91///     })
92///     .await?;
93///
94///     let mut conn = conn.login();
95///
96///     // login
97///     conn.write(ServerboundHello {
98///         name: "bot".to_string(),
99///         profile_id: uuid::Uuid::nil(),
100///     })
101///     .await?;
102///
103///     let (conn, game_profile) = loop {
104///         let packet = conn.read().await?;
105///         match packet {
106///             ClientboundLoginPacket::Hello(p) => {
107///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
108///
109///                 conn.write(ServerboundKey {
110///                     key_bytes: e.encrypted_public_key,
111///                     encrypted_challenge: e.encrypted_challenge,
112///                 })
113///                 .await?;
114///                 conn.set_encryption_key(e.secret_key);
115///             }
116///             ClientboundLoginPacket::LoginCompression(p) => {
117///                 conn.set_compression_threshold(p.compression_threshold);
118///             }
119///             ClientboundLoginPacket::LoginFinished(p) => {
120///                 break (conn.config(), p.game_profile);
121///             }
122///             ClientboundLoginPacket::LoginDisconnect(p) => {
123///                 eprintln!("login disconnect: {}", p.reason);
124///                 return Err("login disconnect".into());
125///             }
126///             ClientboundLoginPacket::CustomQuery(p) => {}
127///             ClientboundLoginPacket::CookieRequest(p) => {
128///                 conn.write(packets::login::ServerboundCookieResponse {
129///                     key: p.key,
130///                     payload: None,
131///                 })
132///                 .await?;
133///             }
134///         }
135///     };
136///
137///     Ok(())
138/// }
139/// ```
140pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
141    pub reader: ReadConnection<R>,
142    pub writer: WriteConnection<W>,
143}
144
145impl RawReadConnection {
146    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
147        read_raw_packet::<_>(
148            &mut self.read_stream,
149            &mut self.buffer,
150            self.compression_threshold,
151            &mut self.dec_cipher,
152        )
153        .await
154    }
155
156    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
157        try_read_raw_packet::<_>(
158            &mut self.read_stream,
159            &mut self.buffer,
160            self.compression_threshold,
161            &mut self.dec_cipher,
162        )
163    }
164}
165
166impl RawWriteConnection {
167    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
168        if let Err(e) = write_raw_packet(
169            packet,
170            &mut self.write_stream,
171            self.compression_threshold,
172            &mut self.enc_cipher,
173        )
174        .await
175        {
176            // detect broken pipe
177            if e.kind() == io::ErrorKind::BrokenPipe {
178                info!("Broken pipe, shutting down connection.");
179                if let Err(e) = self.shutdown().await {
180                    error!("Couldn't shut down: {}", e);
181                }
182            }
183            return Err(e);
184        }
185        Ok(())
186    }
187
188    /// End the connection.
189    pub async fn shutdown(&mut self) -> io::Result<()> {
190        self.write_stream.shutdown().await
191    }
192}
193
194impl<R> ReadConnection<R>
195where
196    R: ProtocolPacket + Debug,
197{
198    /// Read a packet from the stream.
199    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
200        let raw_packet = self.raw.read().await?;
201        deserialize_packet(&mut Cursor::new(&raw_packet))
202    }
203
204    /// Try to read a packet from the stream, or return Ok(None) if there's no
205    /// packet.
206    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
207        let Some(raw_packet) = self.raw.try_read()? else {
208            return Ok(None);
209        };
210        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
211    }
212}
213impl<W> WriteConnection<W>
214where
215    W: ProtocolPacket + Debug,
216{
217    /// Write a packet to the server.
218    pub async fn write(&mut self, packet: W) -> io::Result<()> {
219        self.raw.write(&serialize_packet(&packet).unwrap()).await
220    }
221
222    /// End the connection.
223    pub async fn shutdown(&mut self) -> io::Result<()> {
224        self.raw.shutdown().await
225    }
226}
227
228impl<R, W> Connection<R, W>
229where
230    R: ProtocolPacket + Debug,
231    W: ProtocolPacket + Debug,
232{
233    /// Read a packet from the other side of the connection.
234    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
235        self.reader.read().await
236    }
237
238    /// Try to read a packet from the other side of the connection, or return
239    /// Ok(None) if there's no packet to read.
240    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
241        self.reader.try_read()
242    }
243
244    /// Write a packet to the other side of the connection.
245    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
246        let packet = packet.into_variant();
247        self.writer.write(packet).await
248    }
249
250    /// Split the reader and writer into two objects.
251    ///
252    /// This doesn't allocate.
253    #[must_use]
254    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
255        (self.reader, self.writer)
256    }
257
258    /// Split the reader and writer into the state-agnostic
259    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
260    ///
261    /// This is meant to help with some types of proxies.
262    #[must_use]
263    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
264        (self.reader.raw, self.writer.raw)
265    }
266}
267
268#[derive(Error, Debug)]
269pub enum ConnectionError {
270    #[error("{0}")]
271    Io(#[from] io::Error),
272}
273
274use socks5_impl::protocol::UserKey;
275
276/// An address and authentication method for connecting to a Socks5 proxy.
277#[derive(Debug, Clone)]
278pub struct Proxy {
279    pub addr: SocketAddr,
280    pub auth: Option<UserKey>,
281}
282
283impl Proxy {
284    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
285        Self { addr, auth }
286    }
287}
288impl Display for Proxy {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        write!(f, "socks5://")?;
291        if let Some(auth) = &self.auth {
292            write!(f, "{auth}@")?;
293        }
294        write!(f, "{}", self.addr)
295    }
296}
297
298impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
299    /// Create a new connection to the given address.
300    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
301        let stream = TcpStream::connect(address).await?;
302
303        // enable tcp_nodelay
304        stream.set_nodelay(true)?;
305
306        Self::new_from_stream(stream).await
307    }
308
309    /// Create a new connection to the given address and Socks5 proxy.
310    ///
311    /// If you're not using a proxy, use [`Self::new`] instead.
312    pub async fn new_with_proxy(
313        address: &SocketAddr,
314        proxy: Proxy,
315    ) -> Result<Self, ConnectionError> {
316        let proxy_stream = TcpStream::connect(proxy.addr).await?;
317        let mut stream = BufStream::new(proxy_stream);
318
319        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
320            .await
321            .map_err(io::Error::other)?;
322
323        Self::new_from_stream(stream.into_inner()).await
324    }
325
326    /// Create a new connection from an existing stream.
327    ///
328    /// Useful if you want to set custom options on the stream. Otherwise, just
329    /// use [`Self::new`].
330    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
331        let (read_stream, write_stream) = stream.into_split();
332
333        Ok(Connection {
334            reader: ReadConnection {
335                raw: RawReadConnection {
336                    read_stream,
337                    buffer: Cursor::new(Vec::new()),
338                    compression_threshold: None,
339                    dec_cipher: None,
340                },
341                _reading: PhantomData,
342            },
343            writer: WriteConnection {
344                raw: RawWriteConnection {
345                    write_stream,
346                    compression_threshold: None,
347                    enc_cipher: None,
348                },
349                _writing: PhantomData,
350            },
351        })
352    }
353
354    /// Change our state from handshake to login.
355    ///
356    /// This is the state that is used for logging in.
357    #[must_use]
358    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
359        Connection::from(self)
360    }
361
362    /// Change our state from handshake to status.
363    ///
364    /// This is the state that is used for pinging the server.
365    #[must_use]
366    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
367        Connection::from(self)
368    }
369}
370
371impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
372    /// Set our compression threshold, i.e. the maximum size that a packet is
373    /// allowed to be without getting compressed.
374    ///
375    /// Setting it to 0 means every packet will be compressed. If you set it to
376    /// less than 0 then compression is disabled.
377    pub fn set_compression_threshold(&mut self, threshold: i32) {
378        // if you pass a threshold of less than 0, compression is disabled
379        if threshold >= 0 {
380            self.reader.raw.compression_threshold = Some(threshold as u32);
381            self.writer.raw.compression_threshold = Some(threshold as u32);
382        } else {
383            self.reader.raw.compression_threshold = None;
384            self.writer.raw.compression_threshold = None;
385        }
386    }
387
388    /// Set the encryption key that is used to encrypt and decrypt packets.
389    ///
390    /// It's the same for both reading and writing.
391    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
392        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
393        self.reader.raw.dec_cipher = Some(dec_cipher);
394        self.writer.raw.enc_cipher = Some(enc_cipher);
395    }
396
397    /// Change our state from login to configuration.
398    ///
399    /// This is the state where the server sends us the registries and the
400    /// resource pack.
401    #[must_use]
402    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
403        Connection::from(self)
404    }
405
406    /// Authenticate with Minecraft's servers, which is required to join
407    /// online-mode servers.
408    ///
409    /// This must happen when you get a `ClientboundLoginPacket::Hello` packet.
410    ///
411    /// # Examples
412    ///
413    /// ```rust,no_run
414    /// use azalea_auth::AuthResult;
415    /// use azalea_protocol::{
416    ///     connect::Connection,
417    ///     packets::login::{ClientboundLoginPacket, ServerboundKey},
418    /// };
419    /// use uuid::Uuid;
420    /// # use azalea_protocol::ServerAddress;
421    ///
422    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
423    /// let AuthResult {
424    ///     access_token,
425    ///     profile,
426    /// } = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
427    ///     .await
428    ///     .expect("Couldn't authenticate");
429    /// #
430    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
431    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
432    ///
433    /// let mut conn = Connection::new(&resolved_address).await?;
434    ///
435    /// // transition to the login state, in a real program we would have done a handshake first
436    /// let mut conn = conn.login();
437    ///
438    /// match conn.read().await? {
439    ///     ClientboundLoginPacket::Hello(p) => {
440    ///         // tell Mojang we're joining the server & enable encryption
441    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
442    ///         conn.authenticate(&access_token, &profile.id, e.secret_key, &p)
443    ///             .await?;
444    ///         conn.write(ServerboundKey {
445    ///             key_bytes: e.encrypted_public_key,
446    ///             encrypted_challenge: e.encrypted_challenge,
447    ///         })
448    ///         .await?;
449    ///         conn.set_encryption_key(e.secret_key);
450    ///     }
451    ///     _ => {}
452    /// }
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub async fn authenticate(
457        &self,
458        access_token: &str,
459        uuid: &Uuid,
460        private_key: [u8; 16],
461        packet: &ClientboundHello,
462    ) -> Result<(), ClientSessionServerError> {
463        azalea_auth::sessionserver::join(
464            access_token,
465            &packet.public_key,
466            &private_key,
467            uuid,
468            &packet.server_id,
469        )
470        .await
471    }
472}
473
474impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
475    /// Change our state from handshake to login.
476    ///
477    /// This is the state that is used while negotiating encryption and
478    /// authenticating with Mojang.
479    #[must_use]
480    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
481        Connection::from(self)
482    }
483
484    /// Change our state from handshake to status.
485    ///
486    /// This is the state that is used for pinging the server.
487    #[must_use]
488    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
489        Connection::from(self)
490    }
491}
492
493impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
494    /// Set our compression threshold, i.e. the maximum size that a packet is
495    /// allowed to be without getting compressed.
496    ///
497    /// If you set it to less than 0 then compression gets disabled.
498    pub fn set_compression_threshold(&mut self, threshold: i32) {
499        // if you pass a threshold of less than 0, compression is disabled
500        if threshold >= 0 {
501            self.reader.raw.compression_threshold = Some(threshold as u32);
502            self.writer.raw.compression_threshold = Some(threshold as u32);
503        } else {
504            self.reader.raw.compression_threshold = None;
505            self.writer.raw.compression_threshold = None;
506        }
507    }
508
509    /// Set the encryption key that is used to encrypt and decrypt packets.
510    ///
511    /// It's the same for both reading and writing.
512    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
513        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
514        self.reader.raw.dec_cipher = Some(dec_cipher);
515        self.writer.raw.enc_cipher = Some(enc_cipher);
516    }
517
518    /// Change our state from login to game.
519    ///
520    /// This is the state that's used when the client is actually in the game.
521    #[must_use]
522    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
523        Connection::from(self)
524    }
525
526    /// Verify connecting clients have authenticated with Minecraft's servers.
527    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
528    /// packet.
529    pub async fn authenticate(
530        &self,
531        username: &str,
532        public_key: &[u8],
533        private_key: &[u8; 16],
534        ip: Option<&str>,
535    ) -> Result<GameProfile, ServerSessionServerError> {
536        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
537    }
538
539    /// Change our state back to configuration.
540    #[must_use]
541    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
542        Connection::from(self)
543    }
544}
545
546impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
547    /// Change our state from configuration to game.
548    ///
549    /// This is the state that's used when the client is actually in the world.
550    #[must_use]
551    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
552        Connection::from(self)
553    }
554}
555
556impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
557    /// Change our state from configuration to game.
558    ///
559    /// This is the state that's used when the client is actually in the world.
560    #[must_use]
561    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
562        Connection::from(self)
563    }
564}
565
566impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
567    /// Change our state back to configuration.
568    #[must_use]
569    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
570        Connection::from(self)
571    }
572}
573impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
574    /// Change our state back to configuration.
575    #[must_use]
576    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
577        Connection::from(self)
578    }
579}
580
581// rust doesn't let us implement From because allegedly it conflicts with
582// `core`'s "impl<T> From<T> for T" so we do this instead
583impl<R1, W1> Connection<R1, W1>
584where
585    R1: ProtocolPacket + Debug,
586    W1: ProtocolPacket + Debug,
587{
588    /// Creates a `Connection` of a type from a `Connection` of another type.
589    /// Useful for servers or custom packets.
590    #[must_use]
591    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
592    where
593        R2: ProtocolPacket + Debug,
594        W2: ProtocolPacket + Debug,
595    {
596        Connection {
597            reader: ReadConnection {
598                raw: connection.reader.raw,
599                _reading: PhantomData,
600            },
601            writer: WriteConnection {
602                raw: connection.writer.raw,
603                _writing: PhantomData,
604            },
605        }
606    }
607
608    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
609    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
610        let (read_stream, write_stream) = stream.into_split();
611
612        Connection {
613            reader: ReadConnection {
614                raw: RawReadConnection {
615                    read_stream,
616                    buffer: Cursor::new(Vec::new()),
617                    compression_threshold: None,
618                    dec_cipher: None,
619                },
620                _reading: PhantomData,
621            },
622            writer: WriteConnection {
623                raw: RawWriteConnection {
624                    write_stream,
625                    compression_threshold: None,
626                    enc_cipher: None,
627                },
628                _writing: PhantomData,
629            },
630        }
631    }
632
633    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
634    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
635        self.reader
636            .raw
637            .read_stream
638            .reunite(self.writer.raw.write_stream)
639    }
640}