azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::{
4    fmt::{self, Debug, Display},
5    io::{self, Cursor},
6    marker::PhantomData,
7    net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12    game_profile::GameProfile,
13    sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18    io::{AsyncWriteExt, BufStream},
19    net::{
20        TcpStream,
21        tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22    },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31    packets::{
32        ProtocolPacket,
33        config::{ClientboundConfigPacket, ServerboundConfigPacket},
34        game::{ClientboundGamePacket, ServerboundGamePacket},
35        handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36        login::{ClientboundLoginPacket, ServerboundLoginPacket},
37        status::{ClientboundStatusPacket, ServerboundStatusPacket},
38    },
39    read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40    write::{serialize_packet, write_raw_packet},
41};
42
43pub struct RawReadConnection {
44    pub read_stream: OwnedReadHalf,
45    pub buffer: Cursor<Vec<u8>>,
46    pub compression_threshold: Option<u32>,
47    pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51    pub write_stream: OwnedWriteHalf,
52    pub compression_threshold: Option<u32>,
53    pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56/// The read half of a connection.
57pub struct ReadConnection<R: ProtocolPacket> {
58    pub raw: RawReadConnection,
59    _reading: PhantomData<R>,
60}
61
62/// The write half of a connection.
63pub struct WriteConnection<W: ProtocolPacket> {
64    pub raw: RawWriteConnection,
65    _writing: PhantomData<W>,
66}
67
68/// A connection that can read and write packets.
69///
70/// # Examples
71///
72/// Join an offline-mode server and go through the handshake.
73/// ```rust,no_run
74/// use azalea_protocol::{
75///     connect::Connection,
76///     packets::{
77///         self, ClientIntention, PROTOCOL_VERSION,
78///         handshake::ServerboundIntention,
79///         login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
80///     },
81///     resolver,
82/// };
83///
84/// #[tokio::main]
85/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
86///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
87///     let mut conn = Connection::new(&resolved_address).await?;
88///
89///     // handshake
90///     conn.write(ServerboundIntention {
91///         protocol_version: PROTOCOL_VERSION,
92///         hostname: resolved_address.ip().to_string(),
93///         port: resolved_address.port(),
94///         intention: ClientIntention::Login,
95///     })
96///     .await?;
97///
98///     let mut conn = conn.login();
99///
100///     // login
101///     conn.write(ServerboundHello {
102///         name: "bot".to_string(),
103///         profile_id: uuid::Uuid::nil(),
104///     })
105///     .await?;
106///
107///     let (conn, game_profile) = loop {
108///         let packet = conn.read().await?;
109///         match packet {
110///             ClientboundLoginPacket::Hello(p) => {
111///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
112///
113///                 conn.write(ServerboundKey {
114///                     key_bytes: e.encrypted_public_key,
115///                     encrypted_challenge: e.encrypted_challenge,
116///                 })
117///                 .await?;
118///                 conn.set_encryption_key(e.secret_key);
119///             }
120///             ClientboundLoginPacket::LoginCompression(p) => {
121///                 conn.set_compression_threshold(p.compression_threshold);
122///             }
123///             ClientboundLoginPacket::LoginFinished(p) => {
124///                 break (conn.config(), p.game_profile);
125///             }
126///             ClientboundLoginPacket::LoginDisconnect(p) => {
127///                 eprintln!("login disconnect: {}", p.reason);
128///                 return Err("login disconnect".into());
129///             }
130///             ClientboundLoginPacket::CustomQuery(p) => {}
131///             ClientboundLoginPacket::CookieRequest(p) => {
132///                 conn.write(packets::login::ServerboundCookieResponse {
133///                     key: p.key,
134///                     payload: None,
135///                 })
136///                 .await?;
137///             }
138///         }
139///     };
140///
141///     Ok(())
142/// }
143/// ```
144pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145    pub reader: ReadConnection<R>,
146    pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151        read_raw_packet::<_>(
152            &mut self.read_stream,
153            &mut self.buffer,
154            self.compression_threshold,
155            &mut self.dec_cipher,
156        )
157        .await
158    }
159
160    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161        try_read_raw_packet::<_>(
162            &mut self.read_stream,
163            &mut self.buffer,
164            self.compression_threshold,
165            &mut self.dec_cipher,
166        )
167    }
168}
169
170impl RawWriteConnection {
171    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172        if let Err(e) = write_raw_packet(
173            packet,
174            &mut self.write_stream,
175            self.compression_threshold,
176            &mut self.enc_cipher,
177        )
178        .await
179        {
180            // detect broken pipe
181            if e.kind() == io::ErrorKind::BrokenPipe {
182                info!("Broken pipe, shutting down connection.");
183                if let Err(e) = self.shutdown().await {
184                    error!("Couldn't shut down: {}", e);
185                }
186            }
187            return Err(e);
188        }
189        Ok(())
190    }
191
192    /// End the connection.
193    pub async fn shutdown(&mut self) -> io::Result<()> {
194        self.write_stream.shutdown().await
195    }
196}
197
198impl<R> ReadConnection<R>
199where
200    R: ProtocolPacket + Debug,
201{
202    /// Read a packet from the stream.
203    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
204        let raw_packet = self.raw.read().await?;
205        deserialize_packet(&mut Cursor::new(&raw_packet))
206    }
207
208    /// Try to read a packet from the stream, or return Ok(None) if there's no
209    /// packet.
210    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
211        let Some(raw_packet) = self.raw.try_read()? else {
212            return Ok(None);
213        };
214        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
215    }
216}
217impl<W> WriteConnection<W>
218where
219    W: ProtocolPacket + Debug,
220{
221    /// Write a packet to the server.
222    pub async fn write(&mut self, packet: W) -> io::Result<()> {
223        self.raw.write(&serialize_packet(&packet).unwrap()).await
224    }
225
226    /// End the connection.
227    pub async fn shutdown(&mut self) -> io::Result<()> {
228        self.raw.shutdown().await
229    }
230}
231
232impl<R, W> Connection<R, W>
233where
234    R: ProtocolPacket + Debug,
235    W: ProtocolPacket + Debug,
236{
237    /// Read a packet from the other side of the connection.
238    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
239        self.reader.read().await
240    }
241
242    /// Try to read a packet from the other side of the connection, or return
243    /// Ok(None) if there's no packet to read.
244    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
245        self.reader.try_read()
246    }
247
248    /// Write a packet to the other side of the connection.
249    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
250        let packet = packet.into_variant();
251        self.writer.write(packet).await
252    }
253
254    /// Split the reader and writer into two objects.
255    ///
256    /// This doesn't allocate.
257    #[must_use]
258    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
259        (self.reader, self.writer)
260    }
261
262    /// Split the reader and writer into the state-agnostic
263    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
264    ///
265    /// This is meant to help with some types of proxies.
266    #[must_use]
267    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
268        (self.reader.raw, self.writer.raw)
269    }
270}
271
272#[derive(Error, Debug)]
273pub enum ConnectionError {
274    #[error("{0}")]
275    Io(#[from] io::Error),
276}
277
278use socks5_impl::protocol::UserKey;
279
280/// An address and authentication method for connecting to a Socks5 proxy.
281#[derive(Debug, Clone)]
282pub struct Proxy {
283    pub addr: SocketAddr,
284    pub auth: Option<UserKey>,
285}
286
287impl Proxy {
288    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
289        Self { addr, auth }
290    }
291}
292impl Display for Proxy {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        write!(f, "socks5://")?;
295        if let Some(auth) = &self.auth {
296            write!(f, "{auth}@")?;
297        }
298        write!(f, "{}", self.addr)
299    }
300}
301
302impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
303    /// Create a new connection to the given address.
304    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
305        let stream = TcpStream::connect(address).await?;
306
307        // enable tcp_nodelay
308        stream.set_nodelay(true)?;
309
310        Self::new_from_stream(stream).await
311    }
312
313    /// Create a new connection to the given address and Socks5 proxy.
314    ///
315    /// If you're not using a proxy, use [`Self::new`] instead.
316    pub async fn new_with_proxy(
317        address: &SocketAddr,
318        proxy: Proxy,
319    ) -> Result<Self, ConnectionError> {
320        let proxy_stream = TcpStream::connect(proxy.addr).await?;
321        let mut stream = BufStream::new(proxy_stream);
322
323        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
324            .await
325            .map_err(io::Error::other)?;
326
327        Self::new_from_stream(stream.into_inner()).await
328    }
329
330    /// Create a new connection from an existing stream.
331    ///
332    /// Useful if you want to set custom options on the stream. Otherwise, just
333    /// use [`Self::new`].
334    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
335        let (read_stream, write_stream) = stream.into_split();
336
337        Ok(Connection {
338            reader: ReadConnection {
339                raw: RawReadConnection {
340                    read_stream,
341                    buffer: Cursor::new(Vec::new()),
342                    compression_threshold: None,
343                    dec_cipher: None,
344                },
345                _reading: PhantomData,
346            },
347            writer: WriteConnection {
348                raw: RawWriteConnection {
349                    write_stream,
350                    compression_threshold: None,
351                    enc_cipher: None,
352                },
353                _writing: PhantomData,
354            },
355        })
356    }
357
358    /// Change our state from handshake to login.
359    ///
360    /// This is the state that is used for logging in.
361    #[must_use]
362    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
363        Connection::from(self)
364    }
365
366    /// Change our state from handshake to status.
367    ///
368    /// This is the state that is used for pinging the server.
369    #[must_use]
370    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
371        Connection::from(self)
372    }
373}
374
375impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
376    /// Set our compression threshold, i.e. the maximum size that a packet is
377    /// allowed to be without getting compressed.
378    ///
379    /// Setting it to 0 means every packet will be compressed. If you set it to
380    /// less than 0 then compression is disabled.
381    pub fn set_compression_threshold(&mut self, threshold: i32) {
382        // if you pass a threshold of less than 0, compression is disabled
383        if threshold >= 0 {
384            self.reader.raw.compression_threshold = Some(threshold as u32);
385            self.writer.raw.compression_threshold = Some(threshold as u32);
386        } else {
387            self.reader.raw.compression_threshold = None;
388            self.writer.raw.compression_threshold = None;
389        }
390    }
391
392    /// Set the encryption key that is used to encrypt and decrypt packets.
393    ///
394    /// It's the same for both reading and writing.
395    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
396        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
397        self.reader.raw.dec_cipher = Some(dec_cipher);
398        self.writer.raw.enc_cipher = Some(enc_cipher);
399    }
400
401    /// Change our state from login to configuration.
402    ///
403    /// This is the state where the server sends us the registries and the
404    /// resource pack.
405    #[must_use]
406    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
407        Connection::from(self)
408    }
409
410    /// Authenticate with Minecraft's servers, which is required to join
411    /// online-mode servers.
412    ///
413    /// This must happen when you get a `ClientboundLoginPacket::Hello` packet.
414    ///
415    /// # Examples
416    ///
417    /// ```rust,no_run
418    /// use azalea_auth::AuthResult;
419    /// use azalea_protocol::{
420    ///     connect::Connection,
421    ///     packets::login::{ClientboundLoginPacket, ServerboundKey},
422    /// };
423    /// use uuid::Uuid;
424    /// # use azalea_protocol::ServerAddress;
425    ///
426    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
427    /// let AuthResult {
428    ///     access_token,
429    ///     profile,
430    /// } = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
431    ///     .await
432    ///     .expect("Couldn't authenticate");
433    /// #
434    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
435    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
436    ///
437    /// let mut conn = Connection::new(&resolved_address).await?;
438    ///
439    /// // transition to the login state, in a real program we would have done a handshake first
440    /// let mut conn = conn.login();
441    ///
442    /// match conn.read().await? {
443    ///     ClientboundLoginPacket::Hello(p) => {
444    ///         // tell Mojang we're joining the server & enable encryption
445    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
446    ///         conn.authenticate(&access_token, &profile.id, e.secret_key, &p)
447    ///             .await?;
448    ///         conn.write(ServerboundKey {
449    ///             key_bytes: e.encrypted_public_key,
450    ///             encrypted_challenge: e.encrypted_challenge,
451    ///         })
452    ///         .await?;
453    ///         conn.set_encryption_key(e.secret_key);
454    ///     }
455    ///     _ => {}
456    /// }
457    /// # Ok(())
458    /// # }
459    /// ```
460    #[cfg(feature = "online-mode")]
461    pub async fn authenticate(
462        &self,
463        access_token: &str,
464        uuid: &Uuid,
465        private_key: [u8; 16],
466        packet: &ClientboundHello,
467    ) -> Result<(), ClientSessionServerError> {
468        azalea_auth::sessionserver::join(
469            access_token,
470            &packet.public_key,
471            &private_key,
472            uuid,
473            &packet.server_id,
474        )
475        .await
476    }
477}
478
479impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
480    /// Change our state from handshake to login.
481    ///
482    /// This is the state that is used while negotiating encryption and
483    /// authenticating with Mojang.
484    #[must_use]
485    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
486        Connection::from(self)
487    }
488
489    /// Change our state from handshake to status.
490    ///
491    /// This is the state that is used for pinging the server.
492    #[must_use]
493    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
494        Connection::from(self)
495    }
496}
497
498impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
499    /// Set our compression threshold, i.e. the maximum size that a packet is
500    /// allowed to be without getting compressed.
501    ///
502    /// If you set it to less than 0 then compression gets disabled.
503    pub fn set_compression_threshold(&mut self, threshold: i32) {
504        // if you pass a threshold of less than 0, compression is disabled
505        if threshold >= 0 {
506            self.reader.raw.compression_threshold = Some(threshold as u32);
507            self.writer.raw.compression_threshold = Some(threshold as u32);
508        } else {
509            self.reader.raw.compression_threshold = None;
510            self.writer.raw.compression_threshold = None;
511        }
512    }
513
514    /// Set the encryption key that is used to encrypt and decrypt packets.
515    ///
516    /// It's the same for both reading and writing.
517    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
518        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
519        self.reader.raw.dec_cipher = Some(dec_cipher);
520        self.writer.raw.enc_cipher = Some(enc_cipher);
521    }
522
523    /// Change our state from login to game.
524    ///
525    /// This is the state that's used when the client is actually in the game.
526    #[must_use]
527    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
528        Connection::from(self)
529    }
530
531    /// Verify connecting clients have authenticated with Minecraft's servers.
532    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
533    /// packet.
534    #[cfg(feature = "online-mode")]
535    pub async fn authenticate(
536        &self,
537        username: &str,
538        public_key: &[u8],
539        private_key: &[u8; 16],
540        ip: Option<&str>,
541    ) -> Result<GameProfile, ServerSessionServerError> {
542        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
543    }
544
545    /// Change our state back to configuration.
546    #[must_use]
547    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
548        Connection::from(self)
549    }
550}
551
552impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
553    /// Change our state from configuration to game.
554    ///
555    /// This is the state that's used when the client is actually in the world.
556    #[must_use]
557    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
558        Connection::from(self)
559    }
560}
561
562impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
563    /// Change our state from configuration to game.
564    ///
565    /// This is the state that's used when the client is actually in the world.
566    #[must_use]
567    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
568        Connection::from(self)
569    }
570}
571
572impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
573    /// Change our state back to configuration.
574    #[must_use]
575    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
576        Connection::from(self)
577    }
578}
579impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
580    /// Change our state back to configuration.
581    #[must_use]
582    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
583        Connection::from(self)
584    }
585}
586
587// rust doesn't let us implement From because allegedly it conflicts with
588// `core`'s "impl<T> From<T> for T" so we do this instead
589impl<R1, W1> Connection<R1, W1>
590where
591    R1: ProtocolPacket + Debug,
592    W1: ProtocolPacket + Debug,
593{
594    /// Creates a `Connection` of a type from a `Connection` of another type.
595    /// Useful for servers or custom packets.
596    #[must_use]
597    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
598    where
599        R2: ProtocolPacket + Debug,
600        W2: ProtocolPacket + Debug,
601    {
602        Connection {
603            reader: ReadConnection {
604                raw: connection.reader.raw,
605                _reading: PhantomData,
606            },
607            writer: WriteConnection {
608                raw: connection.writer.raw,
609                _writing: PhantomData,
610            },
611        }
612    }
613
614    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
615    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
616        let (read_stream, write_stream) = stream.into_split();
617
618        Connection {
619            reader: ReadConnection {
620                raw: RawReadConnection {
621                    read_stream,
622                    buffer: Cursor::new(Vec::new()),
623                    compression_threshold: None,
624                    dec_cipher: None,
625                },
626                _reading: PhantomData,
627            },
628            writer: WriteConnection {
629                raw: RawWriteConnection {
630                    write_stream,
631                    compression_threshold: None,
632                    enc_cipher: None,
633                },
634                _writing: PhantomData,
635            },
636        }
637    }
638
639    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
640    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
641        self.reader
642            .raw
643            .read_stream
644            .reunite(self.writer.raw.write_stream)
645    }
646}