Skip to main content

azalea_protocol/
connect.rs

1//! Connect to remote servers/clients.
2
3use std::{
4    fmt::{self, Debug, Display},
5    io::{self, Cursor},
6    marker::PhantomData,
7    net::SocketAddr,
8};
9
10#[cfg(feature = "online-mode")]
11use azalea_auth::{
12    game_profile::GameProfile,
13    sessionserver::{ClientSessionServerError, ServerSessionServerError},
14};
15use azalea_crypto::{Aes128CfbDec, Aes128CfbEnc};
16use thiserror::Error;
17use tokio::{
18    io::{AsyncWriteExt, BufStream},
19    net::{
20        TcpStream,
21        tcp::{OwnedReadHalf, OwnedWriteHalf, ReuniteError},
22    },
23};
24use tracing::{error, info};
25#[cfg(feature = "online-mode")]
26use uuid::Uuid;
27
28#[cfg(feature = "online-mode")]
29use crate::packets::login::ClientboundHello;
30use crate::{
31    packets::{
32        ProtocolPacket,
33        config::{ClientboundConfigPacket, ServerboundConfigPacket},
34        game::{ClientboundGamePacket, ServerboundGamePacket},
35        handshake::{ClientboundHandshakePacket, ServerboundHandshakePacket},
36        login::{ClientboundLoginPacket, ServerboundLoginPacket},
37        status::{ClientboundStatusPacket, ServerboundStatusPacket},
38    },
39    read::{ReadPacketError, deserialize_packet, read_raw_packet, try_read_raw_packet},
40    write::{serialize_packet, write_raw_packet, write_raw_packets},
41};
42
43pub struct RawReadConnection {
44    pub read_stream: OwnedReadHalf,
45    pub buffer: Cursor<Vec<u8>>,
46    pub compression_threshold: Option<u32>,
47    pub dec_cipher: Option<Aes128CfbDec>,
48}
49
50pub struct RawWriteConnection {
51    pub write_stream: OwnedWriteHalf,
52    pub compression_threshold: Option<u32>,
53    pub enc_cipher: Option<Aes128CfbEnc>,
54}
55
56/// The read half of a connection.
57pub struct ReadConnection<R: ProtocolPacket> {
58    pub raw: RawReadConnection,
59    _reading: PhantomData<R>,
60}
61
62/// The write half of a connection.
63pub struct WriteConnection<W: ProtocolPacket> {
64    pub raw: RawWriteConnection,
65    _writing: PhantomData<W>,
66}
67
68/// A connection that can read and write packets.
69///
70/// # Examples
71///
72/// Join an offline-mode server and go through the handshake.
73/// ```rust,no_run
74/// use azalea_protocol::{
75///     connect::Connection,
76///     packets::{
77///         self, ClientIntention, PROTOCOL_VERSION,
78///         handshake::ServerboundIntention,
79///         login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
80///     },
81///     resolver,
82/// };
83///
84/// #[tokio::main]
85/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
86///     let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
87///     let mut conn = Connection::new(&resolved_address).await?;
88///
89///     // handshake
90///     conn.write(ServerboundIntention {
91///         protocol_version: PROTOCOL_VERSION,
92///         hostname: resolved_address.ip().to_string(),
93///         port: resolved_address.port(),
94///         intention: ClientIntention::Login,
95///     })
96///     .await?;
97///
98///     let mut conn = conn.login();
99///
100///     // login
101///     conn.write(ServerboundHello {
102///         name: "bot".to_owned(),
103///         profile_id: uuid::Uuid::nil(),
104///     })
105///     .await?;
106///
107///     let (conn, game_profile) = loop {
108///         let packet = conn.read().await?;
109///         match packet {
110///             ClientboundLoginPacket::Hello(p) => {
111///                 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
112///
113///                 conn.write(ServerboundKey {
114///                     key_bytes: e.encrypted_public_key,
115///                     encrypted_challenge: e.encrypted_challenge,
116///                 })
117///                 .await?;
118///                 conn.set_encryption_key(e.secret_key);
119///             }
120///             ClientboundLoginPacket::LoginCompression(p) => {
121///                 conn.set_compression_threshold(p.compression_threshold);
122///             }
123///             ClientboundLoginPacket::LoginFinished(p) => {
124///                 break (conn.config(), p.game_profile);
125///             }
126///             ClientboundLoginPacket::LoginDisconnect(p) => {
127///                 eprintln!("login disconnect: {}", p.reason);
128///                 return Err("login disconnect".into());
129///             }
130///             ClientboundLoginPacket::CustomQuery(p) => {}
131///             ClientboundLoginPacket::CookieRequest(p) => {
132///                 conn.write(packets::login::ServerboundCookieResponse {
133///                     key: p.key,
134///                     payload: None,
135///                 })
136///                 .await?;
137///             }
138///         }
139///     };
140///
141///     Ok(())
142/// }
143/// ```
144pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
145    pub reader: ReadConnection<R>,
146    pub writer: WriteConnection<W>,
147}
148
149impl RawReadConnection {
150    pub async fn read(&mut self) -> Result<Box<[u8]>, Box<ReadPacketError>> {
151        read_raw_packet::<_>(
152            &mut self.read_stream,
153            &mut self.buffer,
154            self.compression_threshold,
155            &mut self.dec_cipher,
156        )
157        .await
158    }
159
160    pub fn try_read(&mut self) -> Result<Option<Box<[u8]>>, Box<ReadPacketError>> {
161        try_read_raw_packet::<_>(
162            &mut self.read_stream,
163            &mut self.buffer,
164            self.compression_threshold,
165            &mut self.dec_cipher,
166        )
167    }
168}
169
170impl RawWriteConnection {
171    pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
172        if let Err(e) = write_raw_packet(
173            packet,
174            &mut self.write_stream,
175            self.compression_threshold,
176            &mut self.enc_cipher,
177        )
178        .await
179        {
180            // detect broken pipe
181            if e.kind() == io::ErrorKind::BrokenPipe {
182                info!("Broken pipe, shutting down connection.");
183                if let Err(e) = self.shutdown().await {
184                    error!("Couldn't shut down: {}", e);
185                }
186            }
187            return Err(e);
188        }
189        Ok(())
190    }
191
192    pub async fn write_batch(&mut self, packets: impl Iterator<Item = &[u8]>) -> io::Result<()> {
193        if let Err(e) = write_raw_packets(
194            packets,
195            &mut self.write_stream,
196            self.compression_threshold,
197            &mut self.enc_cipher,
198        )
199        .await
200        {
201            // detect broken pipe
202            if e.kind() == io::ErrorKind::BrokenPipe {
203                info!("Broken pipe, shutting down connection.");
204                if let Err(e) = self.shutdown().await {
205                    error!("Couldn't shut down: {}", e);
206                }
207            }
208            return Err(e);
209        }
210        Ok(())
211    }
212
213    /// End the connection.
214    pub async fn shutdown(&mut self) -> io::Result<()> {
215        self.write_stream.shutdown().await
216    }
217}
218
219impl<R> ReadConnection<R>
220where
221    R: ProtocolPacket + Debug,
222{
223    /// Read a packet from the stream.
224    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
225        let raw_packet = self.raw.read().await?;
226        deserialize_packet(&mut Cursor::new(&raw_packet))
227    }
228
229    /// Try to read a packet from the stream, or return Ok(None) if there's no
230    /// packet.
231    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
232        let Some(raw_packet) = self.raw.try_read()? else {
233            return Ok(None);
234        };
235        Ok(Some(deserialize_packet(&mut Cursor::new(&raw_packet))?))
236    }
237}
238impl<W> WriteConnection<W>
239where
240    W: ProtocolPacket + Debug,
241{
242    /// Write a packet to the server.
243    pub async fn write(&mut self, packet: W) -> io::Result<()> {
244        self.raw.write(&serialize_packet(&packet).unwrap()).await
245    }
246
247    /// Write a batch of packets to the server
248    pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
249        let serialized_packets: Vec<Box<[u8]>> = packets
250            .iter()
251            .map(|packet| serialize_packet(packet).unwrap())
252            .collect();
253        self.raw
254            .write_batch(serialized_packets.iter().map(|data| data.as_ref()))
255            .await
256    }
257
258    /// End the connection.
259    pub async fn shutdown(&mut self) -> io::Result<()> {
260        self.raw.shutdown().await
261    }
262}
263
264impl<R, W> Connection<R, W>
265where
266    R: ProtocolPacket + Debug,
267    W: ProtocolPacket + Debug,
268{
269    /// Read a packet from the other side of the connection.
270    pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>> {
271        self.reader.read().await
272    }
273
274    /// Try to read a packet from the other side of the connection, or return
275    /// Ok(None) if there's no packet to read.
276    pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
277        self.reader.try_read()
278    }
279
280    /// Write a packet to the other side of the connection.
281    pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
282        let packet = packet.into_variant();
283        self.writer.write(packet).await
284    }
285
286    /// Write a batch of packets to the other side of the connection.
287    pub async fn write_batch(&mut self, packets: &[W]) -> io::Result<()> {
288        self.writer.write_batch(packets).await
289    }
290
291    /// Split the reader and writer into two objects.
292    ///
293    /// This doesn't allocate.
294    #[must_use]
295    pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>) {
296        (self.reader, self.writer)
297    }
298
299    /// Split the reader and writer into the state-agnostic
300    /// [`RawReadConnection`] and [`RawWriteConnection`] types.
301    ///
302    /// This is meant to help with some types of proxies.
303    #[must_use]
304    pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) {
305        (self.reader.raw, self.writer.raw)
306    }
307}
308
309#[derive(Debug, Error)]
310pub enum ConnectionError {
311    #[error("{0}")]
312    Io(#[from] io::Error),
313}
314
315use socks5_impl::protocol::UserKey;
316
317/// An address and authentication method for connecting to a SOCKS5 proxy.
318#[derive(Clone, Debug)]
319pub struct Proxy {
320    pub addr: SocketAddr,
321    pub auth: Option<UserKey>,
322}
323
324impl Proxy {
325    pub fn new(addr: SocketAddr, auth: Option<UserKey>) -> Self {
326        Self { addr, auth }
327    }
328}
329impl Display for Proxy {
330    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331        write!(f, "socks5://")?;
332        if let Some(auth) = &self.auth {
333            write!(f, "{auth}@")?;
334        }
335        write!(f, "{}", self.addr)
336    }
337}
338
339#[cfg(feature = "online-mode")]
340impl From<Proxy> for reqwest::Proxy {
341    fn from(proxy: Proxy) -> Self {
342        reqwest::Proxy::all(proxy.to_string())
343            .expect("azalea proxies should not fail to parse as reqwest proxies")
344    }
345}
346
347impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
348    /// Create a new connection to the given address.
349    pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError> {
350        let stream = TcpStream::connect(address).await?;
351
352        // enable tcp_nodelay
353        stream.set_nodelay(true)?;
354
355        Self::new_from_stream(stream).await
356    }
357
358    /// Create a new connection to the given address and SOCKS5 proxy.
359    ///
360    /// If you're not using a proxy, use [`Self::new`] instead.
361    pub async fn new_with_proxy(
362        address: &SocketAddr,
363        proxy: Proxy,
364    ) -> Result<Self, ConnectionError> {
365        let proxy_stream = TcpStream::connect(proxy.addr).await?;
366        let mut stream = BufStream::new(proxy_stream);
367
368        let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
369            .await
370            .map_err(io::Error::other)?;
371
372        Self::new_from_stream(stream.into_inner()).await
373    }
374
375    /// Create a new connection from an existing stream.
376    ///
377    /// Useful if you want to set custom options on the stream. Otherwise, just
378    /// use [`Self::new`].
379    pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError> {
380        let (read_stream, write_stream) = stream.into_split();
381
382        Ok(Connection {
383            reader: ReadConnection {
384                raw: RawReadConnection {
385                    read_stream,
386                    buffer: Cursor::new(Vec::new()),
387                    compression_threshold: None,
388                    dec_cipher: None,
389                },
390                _reading: PhantomData,
391            },
392            writer: WriteConnection {
393                raw: RawWriteConnection {
394                    write_stream,
395                    compression_threshold: None,
396                    enc_cipher: None,
397                },
398                _writing: PhantomData,
399            },
400        })
401    }
402
403    /// Change our state from handshake to login.
404    ///
405    /// This is the state that is used for logging in.
406    #[must_use]
407    pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
408        Connection::from(self)
409    }
410
411    /// Change our state from handshake to status.
412    ///
413    /// This is the state that is used for pinging the server.
414    #[must_use]
415    pub fn status(self) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket> {
416        Connection::from(self)
417    }
418}
419
420impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
421    /// Set our compression threshold, i.e. the maximum size that a packet is
422    /// allowed to be without getting compressed.
423    ///
424    /// Setting it to 0 means every packet will be compressed. If you set it to
425    /// less than 0 then compression is disabled.
426    pub fn set_compression_threshold(&mut self, threshold: i32) {
427        // if you pass a threshold of less than 0, compression is disabled
428        if threshold >= 0 {
429            self.reader.raw.compression_threshold = Some(threshold as u32);
430            self.writer.raw.compression_threshold = Some(threshold as u32);
431        } else {
432            self.reader.raw.compression_threshold = None;
433            self.writer.raw.compression_threshold = None;
434        }
435    }
436
437    /// Set the encryption key that is used to encrypt and decrypt packets.
438    ///
439    /// It's the same for both reading and writing.
440    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
441        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
442        self.reader.raw.dec_cipher = Some(dec_cipher);
443        self.writer.raw.enc_cipher = Some(enc_cipher);
444    }
445
446    /// Change our state from login to configuration.
447    ///
448    /// This is the state where the server sends us the registries and the
449    /// resource pack.
450    #[must_use]
451    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
452        Connection::from(self)
453    }
454
455    /// Authenticate with Minecraft's servers, which is required to join
456    /// online-mode servers.
457    ///
458    /// This must happen when you get a `ClientboundLoginPacket::Hello` packet.
459    ///
460    /// # Examples
461    ///
462    /// ```rust,no_run
463    /// use azalea_auth::AuthResult;
464    /// use azalea_protocol::{
465    ///     connect::Connection,
466    ///     packets::login::{ClientboundLoginPacket, ServerboundKey},
467    /// };
468    /// use uuid::Uuid;
469    /// # use azalea_protocol::ServerAddress;
470    ///
471    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
472    /// let AuthResult {
473    ///     access_token,
474    ///     profile,
475    /// } = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
476    ///     .await
477    ///     .expect("Couldn't authenticate");
478    /// #
479    /// # let address = ServerAddress::try_from("[email protected]").unwrap();
480    /// # let resolved_address = azalea_protocol::resolver::resolve_address(&address).await?;
481    ///
482    /// let mut conn = Connection::new(&resolved_address).await?;
483    ///
484    /// // transition to the login state, in a real program we would have done a handshake first
485    /// let mut conn = conn.login();
486    ///
487    /// match conn.read().await? {
488    ///     ClientboundLoginPacket::Hello(p) => {
489    ///         // tell Mojang we're joining the server & enable encryption
490    ///         let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
491    ///         conn.authenticate(&access_token, &profile.id, e.secret_key, &p, None)
492    ///             .await?;
493    ///         conn.write(ServerboundKey {
494    ///             key_bytes: e.encrypted_public_key,
495    ///             encrypted_challenge: e.encrypted_challenge,
496    ///         })
497    ///         .await?;
498    ///         conn.set_encryption_key(e.secret_key);
499    ///     }
500    ///     _ => {}
501    /// }
502    /// # Ok(())
503    /// # }
504    /// ```
505    #[cfg(feature = "online-mode")]
506    pub async fn authenticate(
507        &self,
508        access_token: &str,
509        uuid: &Uuid,
510        private_key: [u8; 16],
511        packet: &ClientboundHello,
512        sessionserver_proxy: Option<Proxy>,
513    ) -> Result<(), ClientSessionServerError> {
514        use azalea_auth::sessionserver::{self, SessionServerJoinOpts};
515
516        sessionserver::join(SessionServerJoinOpts {
517            access_token,
518            public_key: &packet.public_key,
519            private_key: &private_key,
520            uuid,
521            server_id: &packet.server_id,
522            proxy: sessionserver_proxy.map(Proxy::into),
523        })
524        .await
525    }
526}
527
528impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> {
529    /// Change our state from handshake to login.
530    ///
531    /// This is the state that is used while negotiating encryption and
532    /// authenticating with Mojang.
533    #[must_use]
534    pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
535        Connection::from(self)
536    }
537
538    /// Change our state from handshake to status.
539    ///
540    /// This is the state that is used for pinging the server.
541    #[must_use]
542    pub fn status(self) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket> {
543        Connection::from(self)
544    }
545}
546
547impl Connection<ServerboundLoginPacket, ClientboundLoginPacket> {
548    /// Set our compression threshold, i.e. the maximum size that a packet is
549    /// allowed to be without getting compressed.
550    ///
551    /// If you set it to less than 0 then compression gets disabled.
552    pub fn set_compression_threshold(&mut self, threshold: i32) {
553        // if you pass a threshold of less than 0, compression is disabled
554        if threshold >= 0 {
555            self.reader.raw.compression_threshold = Some(threshold as u32);
556            self.writer.raw.compression_threshold = Some(threshold as u32);
557        } else {
558            self.reader.raw.compression_threshold = None;
559            self.writer.raw.compression_threshold = None;
560        }
561    }
562
563    /// Set the encryption key that is used to encrypt and decrypt packets.
564    ///
565    /// It's the same for both reading and writing.
566    pub fn set_encryption_key(&mut self, key: [u8; 16]) {
567        let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
568        self.reader.raw.dec_cipher = Some(dec_cipher);
569        self.writer.raw.enc_cipher = Some(enc_cipher);
570    }
571
572    /// Change our state from login to game.
573    ///
574    /// This is the state that's used when the client is actually in the game.
575    #[must_use]
576    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
577        Connection::from(self)
578    }
579
580    /// Verify connecting clients have authenticated with Minecraft's servers.
581    /// This must happen after the client sends a `ServerboundLoginPacket::Key`
582    /// packet.
583    #[cfg(feature = "online-mode")]
584    pub async fn authenticate(
585        &self,
586        username: &str,
587        public_key: &[u8],
588        private_key: &[u8; 16],
589        ip: Option<&str>,
590    ) -> Result<GameProfile, ServerSessionServerError> {
591        azalea_auth::sessionserver::serverside_auth(username, public_key, private_key, ip).await
592    }
593
594    /// Change our state back to configuration.
595    #[must_use]
596    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
597        Connection::from(self)
598    }
599}
600
601impl Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
602    /// Change our state from configuration to game.
603    ///
604    /// This is the state that's used when the client is actually in the world.
605    #[must_use]
606    pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket> {
607        Connection::from(self)
608    }
609}
610
611impl Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
612    /// Change our state from configuration to game.
613    ///
614    /// This is the state that's used when the client is actually in the world.
615    #[must_use]
616    pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket> {
617        Connection::from(self)
618    }
619}
620
621impl Connection<ClientboundGamePacket, ServerboundGamePacket> {
622    /// Change our state back to configuration.
623    #[must_use]
624    pub fn config(self) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket> {
625        Connection::from(self)
626    }
627}
628impl Connection<ServerboundGamePacket, ClientboundGamePacket> {
629    /// Change our state back to configuration.
630    #[must_use]
631    pub fn config(self) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket> {
632        Connection::from(self)
633    }
634}
635
636// rust doesn't let us implement From because allegedly it conflicts with
637// `core`'s "impl<T> From<T> for T" so we do this instead
638impl<R1, W1> Connection<R1, W1>
639where
640    R1: ProtocolPacket + Debug,
641    W1: ProtocolPacket + Debug,
642{
643    /// Creates a `Connection` of a type from a `Connection` of another type.
644    /// Useful for servers or custom packets.
645    #[must_use]
646    pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
647    where
648        R2: ProtocolPacket + Debug,
649        W2: ProtocolPacket + Debug,
650    {
651        Connection {
652            reader: ReadConnection {
653                raw: connection.reader.raw,
654                _reading: PhantomData,
655            },
656            writer: WriteConnection {
657                raw: connection.writer.raw,
658                _writing: PhantomData,
659            },
660        }
661    }
662
663    /// Convert an existing `TcpStream` into a `Connection`. Useful for servers.
664    pub fn wrap(stream: TcpStream) -> Connection<R1, W1> {
665        let (read_stream, write_stream) = stream.into_split();
666
667        Connection {
668            reader: ReadConnection {
669                raw: RawReadConnection {
670                    read_stream,
671                    buffer: Cursor::new(Vec::new()),
672                    compression_threshold: None,
673                    dec_cipher: None,
674                },
675                _reading: PhantomData,
676            },
677            writer: WriteConnection {
678                raw: RawWriteConnection {
679                    write_stream,
680                    compression_threshold: None,
681                    enc_cipher: None,
682                },
683                _writing: PhantomData,
684            },
685        }
686    }
687
688    /// Convert from a `Connection` into a `TcpStream`. Useful for servers.
689    pub fn unwrap(self) -> Result<TcpStream, ReuniteError> {
690        self.reader
691            .raw
692            .read_stream
693            .reunite(self.writer.raw.write_stream)
694    }
695}