azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::fmt::{self, Debug, Display};
4use std::io::{self, Cursor};
5use std::marker::PhantomData;
6use std::net::SocketAddr;
7
8use azalea_auth::game_profile::GameProfile;
9use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
10use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
11use thiserror::Error;
12use tokio::io::{AsyncWriteExt, BufStream};
13use tokio::net::TcpStream;
14use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
15use tracing::{error, info};
16use uuid::Uuid;
17
18use crate::packets::ProtocolPacket;
19use crate::packets::config::{ClientboundConfigPacket, ServerboundConfigPacket};
20use crate::packets::game::{ClientboundGamePacket, ServerboundGamePacket};
21use crate::packets::handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket};
22use crate::packets::login::c_hello::ClientboundHello;
23use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
24use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
25use crate::read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet};
26use crate::write::{serialize_packet, write_raw_packet};
27
28pub struct RawReadConnection {
29    pub read_stream: OwnedReadHalf,
30    pub buffer: Cursor<Vec<u8>>,
31    pub compression_threshold: Option<u32>,
32    pub dec_cipher: Option<Aes128CfbDec>,
33}
34
35pub struct RawWriteConnection {
36    pub write_stream: OwnedWriteHalf,
37    pub compression_threshold: Option<u32>,
38    pub enc_cipher: Option<Aes128CfbEnc>,
39}
40
41/// The read half of a connection.
42pub struct ReadConnection<R: ProtocolPacket> {
43    pub raw: RawReadConnection,
44    _reading: PhantomData<R>,
45}
46
47/// The write half of a connection.
48pub struct WriteConnection<W: ProtocolPacket> {
49    pub raw: RawWriteConnection,
50    _writing: PhantomData<W>,
51}
52
53/// A connection that can read and write packets.
54///
55/// # Examples
56///
57/// Join an offline-mode server and go through the handshake.
58/// ```rust,no_run
59/// use azalea_protocol::{
60///     resolver,
61///     connect::Connection,
62///     packets::{
63///         self,
64///         ClientIntention, PROTOCOL_VERSION,
65///         login::{
66///             ClientboundLoginPacket,
67///             ServerboundHello,
68///             ServerboundKey
69///         },
70///         handshake::ServerboundIntention
71///     }
72/// };
73///
74/// #[tokio::main]
75/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
76///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
77///     let mut conn = Connection::new(&resolved_address).await?;
78///
79///     // handshake
80///     conn.write(ServerboundIntention {
81///         protocol_version: PROTOCOL_VERSION,
82///         hostname: resolved_address.ip().to_string(),
83///         port: resolved_address.port(),
84///         intention: ClientIntention::Login,
85///     }).await?;
86///
87///     let mut conn = conn.login();
88///
89///     // login
90///     conn.write(ServerboundHello {
91///         name: "bot".to_string(),
92///         profile_id: uuid::Uuid::nil(),
93///     }).await?;
94///
95///     let (conn, game_profile) = loop {
96///         let packet = conn.read().await?;
97///         match packet {
98///             ClientboundLoginPacket::Hello(p) => {
99///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
100///
101///                 conn.write(ServerboundKey {
102///                     key_bytes: e.encrypted_public_key,
103///                     encrypted_challenge: e.encrypted_challenge,
104///                 }).await?;
105///                 conn.set_encryption_key(e.secret_key);
106///             }
107///             ClientboundLoginPacket::LoginCompression(p) => {
108///                 conn.set_compression_threshold(p.compression_threshold);
109///             }
110///             ClientboundLoginPacket::LoginFinished(p) => {
111///                 break (conn.config(), p.game_profile);
112///             }
113///             ClientboundLoginPacket::LoginDisconnect(p) => {
114///                 eprintln!("login disconnect: {}", p.reason);
115///                 return Err("login disconnect".into());
116///             }
117///             ClientboundLoginPacket::CustomQuery(p) => {}
118///             ClientboundLoginPacket::CookieRequest(p) => {
119///                 conn.write(packets::login::ServerboundCookieResponse {
120///                     key: p.key,
121///                     payload: None,
122///                 })
123///                 .await?;
124///             }
125///         }
126///     };
127///
128///     Ok(())
129/// }
130/// ```
131pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
132    pub reader: ReadConnection<R>,
133    pub writer: WriteConnection<W>,
134}
135
136impl RawReadConnection {
137    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
138        read_raw_packet::<_>(
139            &mut self.read_stream,
140            &mut self.buffer,
141            self.compression_threshold,
142            &mut self.dec_cipher,
143        )
144        .await
145    }
146
147    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
148        try_read_raw_packet::<_>(
149            &mut self.read_stream,
150            &mut self.buffer,
151            self.compression_threshold,
152            &mut self.dec_cipher,
153        )
154    }
155}
156
157impl RawWriteConnection {
158    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
159        if let Err(e) = write_raw_packet(
160            packet,
161            &mut self.write_stream,
162            self.compression_threshold,
163            &mut self.enc_cipher,
164        )
165        .await
166        {
167            // detect broken pipe
168            if e.kind() == io::ErrorKind::BrokenPipe {
169                info!("Broken pipe, shutting down connection.");
170                if let Err(e) = self.shutdown().await {
171                    error!("Couldn't shut down: {}", e);
172                }
173            }
174            return Err(e);
175        }
176        Ok(())
177    }
178
179    /// End the connection.
180    pub async fn shutdown(&mut self) -> io::Result<()> {
181        self.write_stream.shutdown().await
182    }
183}
184
185impl<R> ReadConnection<R>
186where
187    R: ProtocolPacket + Debug,
188{
189    /// Read a packet from the stream.
190    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
191        let raw_packet = self.raw.read().await?;
192        deserialize_packet(&mut Cursor::new(&raw_packet))
193    }
194
195    /// Try to read a packet from the stream, or return Ok(None) if there's no
196    /// packet.
197    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
198        let Some(raw_packet) = self.raw.try_read()? else {
199            return Ok(None);
200        };
201        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
202    }
203}
204impl<W> WriteConnection<W>
205where
206    W: ProtocolPacket + Debug,
207{
208    /// Write a packet to the server.
209    pub async fn write(&mut self, packet: W) -> io::Result<()> {
210        self.raw.write(&serialize_packet(&packet).unwrap()).await
211    }
212
213    /// End the connection.
214    pub async fn shutdown(&mut self) -> io::Result<()> {
215        self.raw.shutdown().await
216    }
217}
218
219impl<R, W> Connection<R, W>
220where
221    R: ProtocolPacket + Debug,
222    W: ProtocolPacket + Debug,
223{
224    /// Read a packet from the other side of the connection.
225    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
226        self.reader.read().await
227    }
228
229    /// Try to read a packet from the other side of the connection, or return
230    /// Ok(None) if there's no packet to read.
231    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232        self.reader.try_read()
233    }
234
235    /// Write a packet to the other side of the connection.
236    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
237        let packet = packet.into_variant();
238        self.writer.write(packet).await
239    }
240
241    /// Split the reader and writer into two objects. This doesn't allocate.
242    #[must_use]
243    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
244        (self.reader, self.writer)
245    }
246
247    /// Split the reader and writer into the state-agnostic
248    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
249    ///
250    /// This is meant to help with some types of proxies.
251    #[must_use]
252    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
253        (self.reader.raw, self.writer.raw)
254    }
255}
256
257#[derive(Error, Debug)]
258pub enum ConnectionError {
259    #[error("{0}")]
260    Io(#[from] io::Error),
261}
262
263use socks5_impl::protocol::UserKey;
264
265/// An address and authentication method for connecting to a Socks5 proxy.
266#[derive(Debug, Clone)]
267pub struct Proxy {
268    pub addr: SocketAddr,
269    pub auth: Option<UserKey>,
270}
271
272impl Proxy {
273    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
274        Self { addr, auth }
275    }
276}
277impl Display for Proxy {
278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279        write!(f, "socks5://")?;
280        if let Some(auth) = &self.auth {
281            write!(f, "{auth}@")?;
282        }
283        write!(f, "{}", self.addr)
284    }
285}
286
287impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
288    /// Create a new connection to the given address.
289    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
290        let stream = TcpStream::connect(address).await?;
291
292        // enable tcp_nodelay
293        stream.set_nodelay(true)?;
294
295        Self::new_from_stream(stream).await
296    }
297
298    /// Create a new connection to the given address and Socks5 proxy. If you're
299    /// not using a proxy, use [`Self::new`] instead.
300    pub async fn new_with_proxy(
301        address: &SocketAddr,
302        proxy: Proxy,
303    ) -> Result<Self, ConnectionError> {
304        let proxy_stream = TcpStream::connect(proxy.addr).await?;
305        let mut stream = BufStream::new(proxy_stream);
306
307        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
308            .await
309            .map_err(io::Error::other)?;
310
311        Self::new_from_stream(stream.into_inner()).await
312    }
313
314    /// Create a new connection from an existing stream. Useful if you want to
315    /// set custom options on the stream. Otherwise, just use [`Self::new`].
316    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
317        let (read_stream, write_stream) = stream.into_split();
318
319        Ok(Connection {
320            reader: ReadConnection {
321                raw: RawReadConnection {
322                    read_stream,
323                    buffer: Cursor::new(Vec::new()),
324                    compression_threshold: None,
325                    dec_cipher: None,
326                },
327                _reading: PhantomData,
328            },
329            writer: WriteConnection {
330                raw: RawWriteConnection {
331                    write_stream,
332                    compression_threshold: None,
333                    enc_cipher: None,
334                },
335                _writing: PhantomData,
336            },
337        })
338    }
339
340    /// Change our state from handshake to login. This is the state that is used
341    /// for logging in.
342    #[must_use]
343    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
344        Connection::from(self)
345    }
346
347    /// Change our state from handshake to status. This is the state that is
348    /// used for pinging the server.
349    #[must_use]
350    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
351        Connection::from(self)
352    }
353}
354
355impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
356    /// Set our compression threshold, i.e. the maximum size that a packet is
357    /// allowed to be without getting compressed. Setting it to 0 means every
358    /// packet will be compressed. If you set it to less than 0,
359    /// then compression is disabled.
360    pub fn set_compression_threshold(&mut self, threshold: i32) {
361        // if you pass a threshold of less than 0, compression is disabled
362        if threshold >= 0 {
363            self.reader.raw.compression_threshold = Some(threshold as u32);
364            self.writer.raw.compression_threshold = Some(threshold as u32);
365        } else {
366            self.reader.raw.compression_threshold = None;
367            self.writer.raw.compression_threshold = None;
368        }
369    }
370
371    /// Set the encryption key that is used to encrypt and decrypt packets. It's
372    /// the same for both reading and writing.
373    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
374        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
375        self.reader.raw.dec_cipher = Some(dec_cipher);
376        self.writer.raw.enc_cipher = Some(enc_cipher);
377    }
378
379    /// Change our state from login to configuration. This is the state where
380    /// the server sends us the registries and resource pack and stuff.
381    #[must_use]
382    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
383        Connection::from(self)
384    }
385
386    /// Authenticate with Minecraft's servers, which is required to join
387    /// online-mode servers. This must happen when you get a
388    /// `ClientboundLoginPacket::Hello` packet.
389    ///
390    /// # Examples
391    ///
392    /// ```rust,no_run
393    /// use azalea_auth::AuthResult;
394    /// use azalea_protocol::connect::Connection;
395    /// use azalea_protocol::packets::login::{
396    ///     ClientboundLoginPacket,
397    ///     ServerboundKey
398    /// };
399    /// use uuid::Uuid;
400    /// # use azalea_protocol::ServerAddress;
401    ///
402    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
403    /// let AuthResult { access_token, profile } = azalea_auth::auth(
404    ///     "[email protected]",
405    ///     azalea_auth::AuthOpts::default()
406    /// ).await.expect("Couldn't authenticate");
407    /// #
408    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
409    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
410    ///
411    /// let mut conn = Connection::new(&resolved_address).await?;
412    ///
413    /// // transition to the login state, in a real program we would have done a handshake first
414    /// let mut conn = conn.login();
415    ///
416    /// match conn.read().await? {
417    ///     ClientboundLoginPacket::Hello(p) => {
418    ///         // tell Mojang we're joining the server & enable encryption
419    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
420    ///         conn.authenticate(
421    ///             &access_token,
422    ///             &profile.id,
423    ///             e.secret_key,
424    ///             &p
425    ///         ).await?;
426    ///         conn.write(ServerboundKey {
427    ///             key_bytes: e.encrypted_public_key,
428    ///             encrypted_challenge: e.encrypted_challenge,
429    ///         }).await?;
430    ///         conn.set_encryption_key(e.secret_key);
431    ///     }
432    ///     _ => {}
433    /// }
434    /// # Ok(())
435    /// # }
436    /// ```
437    pub async fn authenticate(
438        &self,
439        access_token: &str,
440        uuid: &Uuid,
441        private_key: [u8; 16],
442        packet: &ClientboundHello,
443    ) -> Result<(), ClientSessionServerError> {
444        azalea_auth::sessionserver::join(
445            access_token,
446            &packet.public_key,
447            &private_key,
448            uuid,
449            &packet.server_id,
450        )
451        .await
452    }
453}
454
455impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
456    /// Change our state from handshake to login. This is the state that is used
457    /// for logging in.
458    #[must_use]
459    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
460        Connection::from(self)
461    }
462
463    /// Change our state from handshake to status. This is the state that is
464    /// used for pinging the server.
465    #[must_use]
466    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
467        Connection::from(self)
468    }
469}
470
471impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
472    /// Set our compression threshold, i.e. the maximum size that a packet is
473    /// allowed to be without getting compressed. If you set it to less than 0
474    /// then compression gets disabled.
475    pub fn set_compression_threshold(&mut self, threshold: i32) {
476        // if you pass a threshold of less than 0, compression is disabled
477        if threshold >= 0 {
478            self.reader.raw.compression_threshold = Some(threshold as u32);
479            self.writer.raw.compression_threshold = Some(threshold as u32);
480        } else {
481            self.reader.raw.compression_threshold = None;
482            self.writer.raw.compression_threshold = None;
483        }
484    }
485
486    /// Set the encryption key that is used to encrypt and decrypt packets. It's
487    /// the same for both reading and writing.
488    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
489        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
490        self.reader.raw.dec_cipher = Some(dec_cipher);
491        self.writer.raw.enc_cipher = Some(enc_cipher);
492    }
493
494    /// Change our state from login to game. This is the state that's used when
495    /// the client is actually in the game.
496    #[must_use]
497    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
498        Connection::from(self)
499    }
500
501    /// Verify connecting clients have authenticated with Minecraft's servers.
502    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
503    /// packet.
504    pub async fn authenticate(
505        &self,
506        username: &str,
507        public_key: &[u8],
508        private_key: &[u8; 16],
509        ip: Option<&str>,
510    ) -> Result<GameProfile, ServerSessionServerError> {
511        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
512    }
513
514    /// Change our state back to configuration.
515    #[must_use]
516    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
517        Connection::from(self)
518    }
519}
520
521impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
522    /// Change our state from configuration to game. This is the state that's
523    /// used when the client is actually in the world.
524    #[must_use]
525    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
526        Connection::from(self)
527    }
528}
529
530impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
531    /// Change our state from configuration to game. This is the state that's
532    /// used when the client is actually in the world.
533    #[must_use]
534    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
535        Connection::from(self)
536    }
537}
538
539impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
540    /// Change our state back to configuration.
541    #[must_use]
542    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
543        Connection::from(self)
544    }
545}
546impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
547    /// Change our state back to configuration.
548    #[must_use]
549    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
550        Connection::from(self)
551    }
552}
553
554// rust doesn't let us implement From because allegedly it conflicts with
555// `core`'s "impl<T> From<T> for T" so we do this instead
556impl<R1, W1> Connection<R1, W1>
557where
558    R1: ProtocolPacket + Debug,
559    W1: ProtocolPacket + Debug,
560{
561    /// Creates a `Connection` of a type from a `Connection` of another type.
562    /// Useful for servers or custom packets.
563    #[must_use]
564    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
565    where
566        R2: ProtocolPacket + Debug,
567        W2: ProtocolPacket + Debug,
568    {
569        Connection {
570            reader: ReadConnection {
571                raw: connection.reader.raw,
572                _reading: PhantomData,
573            },
574            writer: WriteConnection {
575                raw: connection.writer.raw,
576                _writing: PhantomData,
577            },
578        }
579    }
580
581    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
582    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
583        let (read_stream, write_stream) = stream.into_split();
584
585        Connection {
586            reader: ReadConnection {
587                raw: RawReadConnection {
588                    read_stream,
589                    buffer: Cursor::new(Vec::new()),
590                    compression_threshold: None,
591                    dec_cipher: None,
592                },
593                _reading: PhantomData,
594            },
595            writer: WriteConnection {
596                raw: RawWriteConnection {
597                    write_stream,
598                    compression_threshold: None,
599                    enc_cipher: None,
600                },
601                _writing: PhantomData,
602            },
603        }
604    }
605
606    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
607    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
608        self.reader
609            .raw
610            .read_stream
611            .reunite(self.writer.raw.write_stream)
612    }
613}