azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::{
4    fmt::{self, Debug, Display},
5    io::{self, Cursor},
6    marker::PhantomData,
7    net::SocketAddr,
8};
9
10use azalea_auth::{
11    game_profile::GameProfile,
12    sessionserver::{ClientSessionServerError, ServerSessionServerError},
13};
14use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
15use thiserror::Error;
16use tokio::{
17    io::{AsyncWriteExt, BufStream},
18    net::{
19        TcpStream,
20        tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
21    },
22};
23use tracing::{error, info};
24use uuid::Uuid;
25
26use crate::{
27    packets::{
28        ProtocolPacket,
29        config::{ClientboundConfigPacket, ServerboundConfigPacket},
30        game::{ClientboundGamePacket, ServerboundGamePacket},
31        handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
32        login::{ClientboundLoginPacket, ServerboundLoginPacket, c_hello::ClientboundHello},
33        status::{ClientboundStatusPacket, ServerboundStatusPacket},
34    },
35    read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
36    write::{serialize_packet, write_raw_packet},
37};
38
39pub struct RawReadConnection {
40    pub read_stream: OwnedReadHalf,
41    pub buffer: Cursor<Vec<u8>>,
42    pub compression_threshold: Option<u32>,
43    pub dec_cipher: Option<Aes128CfbDec>,
44}
45
46pub struct RawWriteConnection {
47    pub write_stream: OwnedWriteHalf,
48    pub compression_threshold: Option<u32>,
49    pub enc_cipher: Option<Aes128CfbEnc>,
50}
51
52/// The read half of a connection.
53pub struct ReadConnection<R: ProtocolPacket> {
54    pub raw: RawReadConnection,
55    _reading: PhantomData<R>,
56}
57
58/// The write half of a connection.
59pub struct WriteConnection<W: ProtocolPacket> {
60    pub raw: RawWriteConnection,
61    _writing: PhantomData<W>,
62}
63
64/// A connection that can read and write packets.
65///
66/// # Examples
67///
68/// Join an offline-mode server and go through the handshake.
69/// ```rust,no_run
70/// use azalea_protocol::{
71///     connect::Connection,
72///     packets::{
73///         self, ClientIntention, PROTOCOL_VERSION,
74///         handshake::ServerboundIntention,
75///         login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
76///     },
77///     resolver,
78/// };
79///
80/// #[tokio::main]
81/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
82///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
83///     let mut conn = Connection::new(&resolved_address).await?;
84///
85///     // handshake
86///     conn.write(ServerboundIntention {
87///         protocol_version: PROTOCOL_VERSION,
88///         hostname: resolved_address.ip().to_string(),
89///         port: resolved_address.port(),
90///         intention: ClientIntention::Login,
91///     })
92///     .await?;
93///
94///     let mut conn = conn.login();
95///
96///     // login
97///     conn.write(ServerboundHello {
98///         name: "bot".to_string(),
99///         profile_id: uuid::Uuid::nil(),
100///     })
101///     .await?;
102///
103///     let (conn, game_profile) = loop {
104///         let packet = conn.read().await?;
105///         match packet {
106///             ClientboundLoginPacket::Hello(p) => {
107///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
108///
109///                 conn.write(ServerboundKey {
110///                     key_bytes: e.encrypted_public_key,
111///                     encrypted_challenge: e.encrypted_challenge,
112///                 })
113///                 .await?;
114///                 conn.set_encryption_key(e.secret_key);
115///             }
116///             ClientboundLoginPacket::LoginCompression(p) => {
117///                 conn.set_compression_threshold(p.compression_threshold);
118///             }
119///             ClientboundLoginPacket::LoginFinished(p) => {
120///                 break (conn.config(), p.game_profile);
121///             }
122///             ClientboundLoginPacket::LoginDisconnect(p) => {
123///                 eprintln!("login disconnect: {}", p.reason);
124///                 return Err("login disconnect".into());
125///             }
126///             ClientboundLoginPacket::CustomQuery(p) => {}
127///             ClientboundLoginPacket::CookieRequest(p) => {
128///                 conn.write(packets::login::ServerboundCookieResponse {
129///                     key: p.key,
130///                     payload: None,
131///                 })
132///                 .await?;
133///             }
134///         }
135///     };
136///
137///     Ok(())
138/// }
139/// ```
140pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
141    pub reader: ReadConnection<R>,
142    pub writer: WriteConnection<W>,
143}
144
145impl RawReadConnection {
146    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
147        read_raw_packet::<_>(
148            &mut self.read_stream,
149            &mut self.buffer,
150            self.compression_threshold,
151            &mut self.dec_cipher,
152        )
153        .await
154    }
155
156    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
157        try_read_raw_packet::<_>(
158            &mut self.read_stream,
159            &mut self.buffer,
160            self.compression_threshold,
161            &mut self.dec_cipher,
162        )
163    }
164}
165
166impl RawWriteConnection {
167    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
168        if let Err(e) = write_raw_packet(
169            packet,
170            &mut self.write_stream,
171            self.compression_threshold,
172            &mut self.enc_cipher,
173        )
174        .await
175        {
176            // detect broken pipe
177            if e.kind() == io::ErrorKind::BrokenPipe {
178                info!("Broken pipe, shutting down connection.");
179                if let Err(e) = self.shutdown().await {
180                    error!("Couldn't shut down: {}", e);
181                }
182            }
183            return Err(e);
184        }
185        Ok(())
186    }
187
188    /// End the connection.
189    pub async fn shutdown(&mut self) -> io::Result<()> {
190        self.write_stream.shutdown().await
191    }
192}
193
194impl<R> ReadConnection<R>
195where
196    R: ProtocolPacket + Debug,
197{
198    /// Read a packet from the stream.
199    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
200        let raw_packet = self.raw.read().await?;
201        deserialize_packet(&mut Cursor::new(&raw_packet))
202    }
203
204    /// Try to read a packet from the stream, or return Ok(None) if there's no
205    /// packet.
206    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
207        let Some(raw_packet) = self.raw.try_read()? else {
208            return Ok(None);
209        };
210        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
211    }
212}
213impl<W> WriteConnection<W>
214where
215    W: ProtocolPacket + Debug,
216{
217    /// Write a packet to the server.
218    pub async fn write(&mut self, packet: W) -> io::Result<()> {
219        self.raw.write(&serialize_packet(&packet).unwrap()).await
220    }
221
222    /// End the connection.
223    pub async fn shutdown(&mut self) -> io::Result<()> {
224        self.raw.shutdown().await
225    }
226}
227
228impl<R, W> Connection<R, W>
229where
230    R: ProtocolPacket + Debug,
231    W: ProtocolPacket + Debug,
232{
233    /// Read a packet from the other side of the connection.
234    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
235        self.reader.read().await
236    }
237
238    /// Try to read a packet from the other side of the connection, or return
239    /// Ok(None) if there's no packet to read.
240    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
241        self.reader.try_read()
242    }
243
244    /// Write a packet to the other side of the connection.
245    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
246        let packet = packet.into_variant();
247        self.writer.write(packet).await
248    }
249
250    /// Split the reader and writer into two objects. This doesn't allocate.
251    #[must_use]
252    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
253        (self.reader, self.writer)
254    }
255
256    /// Split the reader and writer into the state-agnostic
257    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
258    ///
259    /// This is meant to help with some types of proxies.
260    #[must_use]
261    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
262        (self.reader.raw, self.writer.raw)
263    }
264}
265
266#[derive(Error, Debug)]
267pub enum ConnectionError {
268    #[error("{0}")]
269    Io(#[from] io::Error),
270}
271
272use socks5_impl::protocol::UserKey;
273
274/// An address and authentication method for connecting to a Socks5 proxy.
275#[derive(Debug, Clone)]
276pub struct Proxy {
277    pub addr: SocketAddr,
278    pub auth: Option<UserKey>,
279}
280
281impl Proxy {
282    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
283        Self { addr, auth }
284    }
285}
286impl Display for Proxy {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        write!(f, "socks5://")?;
289        if let Some(auth) = &self.auth {
290            write!(f, "{auth}@")?;
291        }
292        write!(f, "{}", self.addr)
293    }
294}
295
296impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
297    /// Create a new connection to the given address.
298    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
299        let stream = TcpStream::connect(address).await?;
300
301        // enable tcp_nodelay
302        stream.set_nodelay(true)?;
303
304        Self::new_from_stream(stream).await
305    }
306
307    /// Create a new connection to the given address and Socks5 proxy. If you're
308    /// not using a proxy, use [`Self::new`] instead.
309    pub async fn new_with_proxy(
310        address: &SocketAddr,
311        proxy: Proxy,
312    ) -> Result<Self, ConnectionError> {
313        let proxy_stream = TcpStream::connect(proxy.addr).await?;
314        let mut stream = BufStream::new(proxy_stream);
315
316        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
317            .await
318            .map_err(io::Error::other)?;
319
320        Self::new_from_stream(stream.into_inner()).await
321    }
322
323    /// Create a new connection from an existing stream. Useful if you want to
324    /// set custom options on the stream. Otherwise, just use [`Self::new`].
325    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
326        let (read_stream, write_stream) = stream.into_split();
327
328        Ok(Connection {
329            reader: ReadConnection {
330                raw: RawReadConnection {
331                    read_stream,
332                    buffer: Cursor::new(Vec::new()),
333                    compression_threshold: None,
334                    dec_cipher: None,
335                },
336                _reading: PhantomData,
337            },
338            writer: WriteConnection {
339                raw: RawWriteConnection {
340                    write_stream,
341                    compression_threshold: None,
342                    enc_cipher: None,
343                },
344                _writing: PhantomData,
345            },
346        })
347    }
348
349    /// Change our state from handshake to login. This is the state that is used
350    /// for logging in.
351    #[must_use]
352    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
353        Connection::from(self)
354    }
355
356    /// Change our state from handshake to status. This is the state that is
357    /// used for pinging the server.
358    #[must_use]
359    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
360        Connection::from(self)
361    }
362}
363
364impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
365    /// Set our compression threshold, i.e. the maximum size that a packet is
366    /// allowed to be without getting compressed. Setting it to 0 means every
367    /// packet will be compressed. If you set it to less than 0,
368    /// then compression is disabled.
369    pub fn set_compression_threshold(&mut self, threshold: i32) {
370        // if you pass a threshold of less than 0, compression is disabled
371        if threshold >= 0 {
372            self.reader.raw.compression_threshold = Some(threshold as u32);
373            self.writer.raw.compression_threshold = Some(threshold as u32);
374        } else {
375            self.reader.raw.compression_threshold = None;
376            self.writer.raw.compression_threshold = None;
377        }
378    }
379
380    /// Set the encryption key that is used to encrypt and decrypt packets. It's
381    /// the same for both reading and writing.
382    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
383        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
384        self.reader.raw.dec_cipher = Some(dec_cipher);
385        self.writer.raw.enc_cipher = Some(enc_cipher);
386    }
387
388    /// Change our state from login to configuration. This is the state where
389    /// the server sends us the registries and resource pack and stuff.
390    #[must_use]
391    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
392        Connection::from(self)
393    }
394
395    /// Authenticate with Minecraft's servers, which is required to join
396    /// online-mode servers. This must happen when you get a
397    /// `ClientboundLoginPacket::Hello` packet.
398    ///
399    /// # Examples
400    ///
401    /// ```rust,no_run
402    /// use azalea_auth::AuthResult;
403    /// use azalea_protocol::{
404    ///     connect::Connection,
405    ///     packets::login::{ClientboundLoginPacket, ServerboundKey},
406    /// };
407    /// use uuid::Uuid;
408    /// # use azalea_protocol::ServerAddress;
409    ///
410    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
411    /// let AuthResult {
412    ///     access_token,
413    ///     profile,
414    /// } = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
415    ///     .await
416    ///     .expect("Couldn't authenticate");
417    /// #
418    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
419    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
420    ///
421    /// let mut conn = Connection::new(&resolved_address).await?;
422    ///
423    /// // transition to the login state, in a real program we would have done a handshake first
424    /// let mut conn = conn.login();
425    ///
426    /// match conn.read().await? {
427    ///     ClientboundLoginPacket::Hello(p) => {
428    ///         // tell Mojang we're joining the server & enable encryption
429    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
430    ///         conn.authenticate(&access_token, &profile.id, e.secret_key, &p)
431    ///             .await?;
432    ///         conn.write(ServerboundKey {
433    ///             key_bytes: e.encrypted_public_key,
434    ///             encrypted_challenge: e.encrypted_challenge,
435    ///         })
436    ///         .await?;
437    ///         conn.set_encryption_key(e.secret_key);
438    ///     }
439    ///     _ => {}
440    /// }
441    /// # Ok(())
442    /// # }
443    /// ```
444    pub async fn authenticate(
445        &self,
446        access_token: &str,
447        uuid: &Uuid,
448        private_key: [u8; 16],
449        packet: &ClientboundHello,
450    ) -> Result<(), ClientSessionServerError> {
451        azalea_auth::sessionserver::join(
452            access_token,
453            &packet.public_key,
454            &private_key,
455            uuid,
456            &packet.server_id,
457        )
458        .await
459    }
460}
461
462impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
463    /// Change our state from handshake to login. This is the state that is used
464    /// for logging in.
465    #[must_use]
466    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
467        Connection::from(self)
468    }
469
470    /// Change our state from handshake to status. This is the state that is
471    /// used for pinging the server.
472    #[must_use]
473    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
474        Connection::from(self)
475    }
476}
477
478impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
479    /// Set our compression threshold, i.e. the maximum size that a packet is
480    /// allowed to be without getting compressed. If you set it to less than 0
481    /// then compression gets disabled.
482    pub fn set_compression_threshold(&mut self, threshold: i32) {
483        // if you pass a threshold of less than 0, compression is disabled
484        if threshold >= 0 {
485            self.reader.raw.compression_threshold = Some(threshold as u32);
486            self.writer.raw.compression_threshold = Some(threshold as u32);
487        } else {
488            self.reader.raw.compression_threshold = None;
489            self.writer.raw.compression_threshold = None;
490        }
491    }
492
493    /// Set the encryption key that is used to encrypt and decrypt packets. It's
494    /// the same for both reading and writing.
495    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
496        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
497        self.reader.raw.dec_cipher = Some(dec_cipher);
498        self.writer.raw.enc_cipher = Some(enc_cipher);
499    }
500
501    /// Change our state from login to game. This is the state that's used when
502    /// the client is actually in the game.
503    #[must_use]
504    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
505        Connection::from(self)
506    }
507
508    /// Verify connecting clients have authenticated with Minecraft's servers.
509    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
510    /// packet.
511    pub async fn authenticate(
512        &self,
513        username: &str,
514        public_key: &[u8],
515        private_key: &[u8; 16],
516        ip: Option<&str>,
517    ) -> Result<GameProfile, ServerSessionServerError> {
518        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
519    }
520
521    /// Change our state back to configuration.
522    #[must_use]
523    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
524        Connection::from(self)
525    }
526}
527
528impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
529    /// Change our state from configuration to game. This is the state that's
530    /// used when the client is actually in the world.
531    #[must_use]
532    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
533        Connection::from(self)
534    }
535}
536
537impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
538    /// Change our state from configuration to game. This is the state that's
539    /// used when the client is actually in the world.
540    #[must_use]
541    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
542        Connection::from(self)
543    }
544}
545
546impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
547    /// Change our state back to configuration.
548    #[must_use]
549    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
550        Connection::from(self)
551    }
552}
553impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
554    /// Change our state back to configuration.
555    #[must_use]
556    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
557        Connection::from(self)
558    }
559}
560
561// rust doesn't let us implement From because allegedly it conflicts with
562// `core`'s "impl<T> From<T> for T" so we do this instead
563impl<R1, W1> Connection<R1, W1>
564where
565    R1: ProtocolPacket + Debug,
566    W1: ProtocolPacket + Debug,
567{
568    /// Creates a `Connection` of a type from a `Connection` of another type.
569    /// Useful for servers or custom packets.
570    #[must_use]
571    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
572    where
573        R2: ProtocolPacket + Debug,
574        W2: ProtocolPacket + Debug,
575    {
576        Connection {
577            reader: ReadConnection {
578                raw: connection.reader.raw,
579                _reading: PhantomData,
580            },
581            writer: WriteConnection {
582                raw: connection.writer.raw,
583                _writing: PhantomData,
584            },
585        }
586    }
587
588    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
589    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
590        let (read_stream, write_stream) = stream.into_split();
591
592        Connection {
593            reader: ReadConnection {
594                raw: RawReadConnection {
595                    read_stream,
596                    buffer: Cursor::new(Vec::new()),
597                    compression_threshold: None,
598                    dec_cipher: None,
599                },
600                _reading: PhantomData,
601            },
602            writer: WriteConnection {
603                raw: RawWriteConnection {
604                    write_stream,
605                    compression_threshold: None,
606                    enc_cipher: None,
607                },
608                _writing: PhantomData,
609            },
610        }
611    }
612
613    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
614    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
615        self.reader
616            .raw
617            .read_stream
618            .reunite(self.writer.raw.write_stream)
619    }
620}