pub struct Connection<R: ProtocolPacket, W: ProtocolPacket> {
pub reader: ReadConnection<R>,
pub writer: WriteConnection<W>,
}Expand description
A connection that can read and write packets.
§Examples
Join an offline-mode server and go through the handshake.
use azalea_protocol::{
connect::Connection,
packets::{
self, ClientIntention, PROTOCOL_VERSION,
handshake::ServerboundIntention,
login::{ClientboundLoginPacket, ServerboundHello, ServerboundKey},
},
resolver,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let resolved_address = resolver::resolve_address(&"localhost".try_into().unwrap()).await?;
let mut conn = Connection::new(&resolved_address).await?;
// handshake
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
hostname: resolved_address.ip().to_string(),
port: resolved_address.port(),
intention: ClientIntention::Login,
})
.await?;
let mut conn = conn.login();
// login
conn.write(ServerboundHello {
name: "bot".to_owned(),
profile_id: uuid::Uuid::nil(),
})
.await?;
let (conn, game_profile) = loop {
let packet = conn.read().await?;
match packet {
ClientboundLoginPacket::Hello(p) => {
let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
conn.write(ServerboundKey {
key_bytes: e.encrypted_public_key,
encrypted_challenge: e.encrypted_challenge,
})
.await?;
conn.set_encryption_key(e.secret_key);
}
ClientboundLoginPacket::LoginCompression(p) => {
conn.set_compression_threshold(p.compression_threshold);
}
ClientboundLoginPacket::LoginFinished(p) => {
break (conn.config(), p.game_profile);
}
ClientboundLoginPacket::LoginDisconnect(p) => {
eprintln!("login disconnect: {}", p.reason);
return Err("login disconnect".into());
}
ClientboundLoginPacket::CustomQuery(p) => {}
ClientboundLoginPacket::CookieRequest(p) => {
conn.write(packets::login::ServerboundCookieResponse {
key: p.key,
payload: None,
})
.await?;
}
}
};
Ok(())
}Fields§
§reader: ReadConnection<R>§writer: WriteConnection<W>Implementations§
Source§impl<R, W> Connection<R, W>
impl<R, W> Connection<R, W>
Sourcepub async fn read(&mut self) -> Result<R, Box<ReadPacketError>>
pub async fn read(&mut self) -> Result<R, Box<ReadPacketError>>
Read a packet from the other side of the connection.
Examples found in repository?
97async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
98 stream.set_nodelay(true)?;
99 let ip = stream.peer_addr()?;
100 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
101 Connection::wrap(stream);
102
103 // The first packet sent from a client is the intent packet.
104 // This specifies whether the client is pinging
105 // the server or is going to join the game.
106 let intent = match conn.read().await {
107 Ok(packet) => match packet {
108 ServerboundHandshakePacket::Intention(packet) => {
109 info!(
110 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
111 ip.ip(),
112 packet.hostname,
113 packet.port,
114 packet.protocol_version,
115 packet.intention
116 );
117 packet
118 }
119 },
120 Err(e) => {
121 let e = e.into();
122 warn!("Error during intent: {e}");
123 return Err(e);
124 }
125 };
126
127 match intent.intention {
128 // If the client is pinging the proxy, reply with the information below.
129 ClientIntention::Status => {
130 let mut conn = conn.status();
131 loop {
132 match conn.read().await {
133 Ok(p) => match p {
134 ServerboundStatusPacket::StatusRequest(_) => {
135 conn.write(ClientboundStatusResponse {
136 description: PROXY_DESC.into(),
137 favicon: PROXY_FAVICON.clone(),
138 players: PROXY_PLAYERS.clone(),
139 version: PROXY_VERSION.clone(),
140 enforces_secure_chat: PROXY_SECURE_CHAT,
141 })
142 .await?;
143 }
144 ServerboundStatusPacket::PingRequest(p) => {
145 conn.write(ClientboundPongResponse { time: p.time }).await?;
146 break;
147 }
148 },
149 Err(e) => match *e {
150 ReadPacketError::ConnectionClosed => {
151 break;
152 }
153 e => {
154 warn!("Error during status: {e}");
155 return Err(e.into());
156 }
157 },
158 }
159 }
160 }
161 // If the client intends to join the proxy, wait for them to send the `Hello` packet to log
162 // their username and uuid, then start proxying their connection.
163 ClientIntention::Login => {
164 let mut conn = conn.login();
165 loop {
166 match conn.read().await {
167 Ok(p) => {
168 if let ServerboundLoginPacket::Hello(hello) = p {
169 info!(
170 "Player \'{}\' from {} logging in with uuid: {}",
171 hello.name,
172 ip.ip(),
173 hello.profile_id.to_string()
174 );
175
176 tokio::spawn(proxy_conn(conn).map(|r| {
177 if let Err(e) = r {
178 error!("Failed to proxy: {e}");
179 }
180 }));
181
182 break;
183 }
184 }
185 Err(e) => match *e {
186 ReadPacketError::ConnectionClosed => {
187 break;
188 }
189 e => {
190 warn!("Error during login: {e}");
191 return Err(e.into());
192 }
193 },
194 }
195 }
196 }
197 ClientIntention::Transfer => {
198 warn!("Client attempted to join via transfer")
199 }
200 }
201
202 Ok(())
203}
204
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}More examples
67async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
68 stream.set_nodelay(true)?;
69 let ip = stream.peer_addr()?;
70 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
71 Connection::wrap(stream);
72
73 // The first packet sent from a client is the intent packet.
74 // This specifies whether the client is pinging
75 // the server or is going to join the game.
76 let intent = match conn.read().await {
77 Ok(packet) => match packet {
78 ServerboundHandshakePacket::Intention(packet) => {
79 info!(
80 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
81 ip.ip(),
82 packet.hostname,
83 packet.port,
84 packet.protocol_version,
85 packet.intention
86 );
87 packet
88 }
89 },
90 Err(e) => {
91 let e = e.into();
92 warn!("Error during intent: {e}");
93 return Err(e);
94 }
95 };
96
97 match intent.intention {
98 // If the client is pinging the proxy,
99 // reply with the information below.
100 ClientIntention::Status => {
101 let mut conn = conn.status();
102 loop {
103 match conn.read().await {
104 Ok(p) => match p {
105 ServerboundStatusPacket::StatusRequest(_) => {
106 conn.write(ClientboundStatusResponse {
107 description: PROXY_DESC.into(),
108 favicon: PROXY_FAVICON.clone(),
109 players: PROXY_PLAYERS.clone(),
110 version: PROXY_VERSION.clone(),
111 enforces_secure_chat: PROXY_SECURE_CHAT,
112 })
113 .await?;
114 }
115 ServerboundStatusPacket::PingRequest(p) => {
116 conn.write(ClientboundPongResponse { time: p.time }).await?;
117 break;
118 }
119 },
120 Err(e) => match *e {
121 ReadPacketError::ConnectionClosed => {
122 break;
123 }
124 e => {
125 warn!("Error during status: {e}");
126 return Err(e.into());
127 }
128 },
129 }
130 }
131 }
132 // If the client intends to join the proxy,
133 // wait for them to send the `Hello` packet to
134 // log their username and uuid, then forward the
135 // connection along to the proxy target.
136 ClientIntention::Login => {
137 let mut conn = conn.login();
138 loop {
139 match conn.read().await {
140 Ok(p) => {
141 if let ServerboundLoginPacket::Hello(hello) = p {
142 info!(
143 "Player \'{0}\' from {1} logging in with uuid: {2}",
144 hello.name,
145 ip.ip(),
146 hello.profile_id.to_string()
147 );
148
149 tokio::spawn(transfer(conn.unwrap()?, intent, hello).map(|r| {
150 if let Err(e) = r {
151 error!("Failed to proxy: {e}");
152 }
153 }));
154
155 break;
156 }
157 }
158 Err(e) => match *e {
159 ReadPacketError::ConnectionClosed => {
160 break;
161 }
162 e => {
163 warn!("Error during login: {e}");
164 return Err(e.into());
165 }
166 },
167 }
168 }
169 }
170 ClientIntention::Transfer => {
171 warn!("Client attempted to join via transfer")
172 }
173 }
174
175 Ok(())
176}Sourcepub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>>
pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>>
Try to read a packet from the other side of the connection, or return Ok(None) if there’s no packet to read.
Sourcepub async fn write(&mut self, packet: impl Packet<W>) -> Result<()>
pub async fn write(&mut self, packet: impl Packet<W>) -> Result<()>
Write a packet to the other side of the connection.
Examples found in repository?
97async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
98 stream.set_nodelay(true)?;
99 let ip = stream.peer_addr()?;
100 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
101 Connection::wrap(stream);
102
103 // The first packet sent from a client is the intent packet.
104 // This specifies whether the client is pinging
105 // the server or is going to join the game.
106 let intent = match conn.read().await {
107 Ok(packet) => match packet {
108 ServerboundHandshakePacket::Intention(packet) => {
109 info!(
110 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
111 ip.ip(),
112 packet.hostname,
113 packet.port,
114 packet.protocol_version,
115 packet.intention
116 );
117 packet
118 }
119 },
120 Err(e) => {
121 let e = e.into();
122 warn!("Error during intent: {e}");
123 return Err(e);
124 }
125 };
126
127 match intent.intention {
128 // If the client is pinging the proxy, reply with the information below.
129 ClientIntention::Status => {
130 let mut conn = conn.status();
131 loop {
132 match conn.read().await {
133 Ok(p) => match p {
134 ServerboundStatusPacket::StatusRequest(_) => {
135 conn.write(ClientboundStatusResponse {
136 description: PROXY_DESC.into(),
137 favicon: PROXY_FAVICON.clone(),
138 players: PROXY_PLAYERS.clone(),
139 version: PROXY_VERSION.clone(),
140 enforces_secure_chat: PROXY_SECURE_CHAT,
141 })
142 .await?;
143 }
144 ServerboundStatusPacket::PingRequest(p) => {
145 conn.write(ClientboundPongResponse { time: p.time }).await?;
146 break;
147 }
148 },
149 Err(e) => match *e {
150 ReadPacketError::ConnectionClosed => {
151 break;
152 }
153 e => {
154 warn!("Error during status: {e}");
155 return Err(e.into());
156 }
157 },
158 }
159 }
160 }
161 // If the client intends to join the proxy, wait for them to send the `Hello` packet to log
162 // their username and uuid, then start proxying their connection.
163 ClientIntention::Login => {
164 let mut conn = conn.login();
165 loop {
166 match conn.read().await {
167 Ok(p) => {
168 if let ServerboundLoginPacket::Hello(hello) = p {
169 info!(
170 "Player \'{}\' from {} logging in with uuid: {}",
171 hello.name,
172 ip.ip(),
173 hello.profile_id.to_string()
174 );
175
176 tokio::spawn(proxy_conn(conn).map(|r| {
177 if let Err(e) = r {
178 error!("Failed to proxy: {e}");
179 }
180 }));
181
182 break;
183 }
184 }
185 Err(e) => match *e {
186 ReadPacketError::ConnectionClosed => {
187 break;
188 }
189 e => {
190 warn!("Error during login: {e}");
191 return Err(e.into());
192 }
193 },
194 }
195 }
196 }
197 ClientIntention::Transfer => {
198 warn!("Client attempted to join via transfer")
199 }
200 }
201
202 Ok(())
203}
204
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}More examples
67async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
68 stream.set_nodelay(true)?;
69 let ip = stream.peer_addr()?;
70 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
71 Connection::wrap(stream);
72
73 // The first packet sent from a client is the intent packet.
74 // This specifies whether the client is pinging
75 // the server or is going to join the game.
76 let intent = match conn.read().await {
77 Ok(packet) => match packet {
78 ServerboundHandshakePacket::Intention(packet) => {
79 info!(
80 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
81 ip.ip(),
82 packet.hostname,
83 packet.port,
84 packet.protocol_version,
85 packet.intention
86 );
87 packet
88 }
89 },
90 Err(e) => {
91 let e = e.into();
92 warn!("Error during intent: {e}");
93 return Err(e);
94 }
95 };
96
97 match intent.intention {
98 // If the client is pinging the proxy,
99 // reply with the information below.
100 ClientIntention::Status => {
101 let mut conn = conn.status();
102 loop {
103 match conn.read().await {
104 Ok(p) => match p {
105 ServerboundStatusPacket::StatusRequest(_) => {
106 conn.write(ClientboundStatusResponse {
107 description: PROXY_DESC.into(),
108 favicon: PROXY_FAVICON.clone(),
109 players: PROXY_PLAYERS.clone(),
110 version: PROXY_VERSION.clone(),
111 enforces_secure_chat: PROXY_SECURE_CHAT,
112 })
113 .await?;
114 }
115 ServerboundStatusPacket::PingRequest(p) => {
116 conn.write(ClientboundPongResponse { time: p.time }).await?;
117 break;
118 }
119 },
120 Err(e) => match *e {
121 ReadPacketError::ConnectionClosed => {
122 break;
123 }
124 e => {
125 warn!("Error during status: {e}");
126 return Err(e.into());
127 }
128 },
129 }
130 }
131 }
132 // If the client intends to join the proxy,
133 // wait for them to send the `Hello` packet to
134 // log their username and uuid, then forward the
135 // connection along to the proxy target.
136 ClientIntention::Login => {
137 let mut conn = conn.login();
138 loop {
139 match conn.read().await {
140 Ok(p) => {
141 if let ServerboundLoginPacket::Hello(hello) = p {
142 info!(
143 "Player \'{0}\' from {1} logging in with uuid: {2}",
144 hello.name,
145 ip.ip(),
146 hello.profile_id.to_string()
147 );
148
149 tokio::spawn(transfer(conn.unwrap()?, intent, hello).map(|r| {
150 if let Err(e) = r {
151 error!("Failed to proxy: {e}");
152 }
153 }));
154
155 break;
156 }
157 }
158 Err(e) => match *e {
159 ReadPacketError::ConnectionClosed => {
160 break;
161 }
162 e => {
163 warn!("Error during login: {e}");
164 return Err(e.into());
165 }
166 },
167 }
168 }
169 }
170 ClientIntention::Transfer => {
171 warn!("Client attempted to join via transfer")
172 }
173 }
174
175 Ok(())
176}
177
178async fn transfer(
179 mut inbound: TcpStream,
180 intent: ServerboundIntention,
181 hello: ServerboundHello,
182) -> Result<(), Box<dyn Error>> {
183 let outbound = TcpStream::connect(PROXY_ADDR).await?;
184 let name = hello.name.clone();
185 outbound.set_nodelay(true)?;
186
187 // Repeat the intent and hello packet
188 // received earlier to the proxy target
189 let mut outbound_conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> =
190 Connection::wrap(outbound);
191 outbound_conn.write(intent).await?;
192
193 let mut outbound_conn = outbound_conn.login();
194 outbound_conn.write(hello).await?;
195
196 let mut outbound = outbound_conn.unwrap()?;
197
198 // Split the incoming and outgoing connections in
199 // halves and handle each pair on separate threads.
200 let (mut ri, mut wi) = inbound.split();
201 let (mut ro, mut wo) = outbound.split();
202
203 let client_to_server = async {
204 io::copy(&mut ri, &mut wo).await?;
205 wo.shutdown().await
206 };
207
208 let server_to_client = async {
209 io::copy(&mut ro, &mut wi).await?;
210 wi.shutdown().await
211 };
212
213 tokio::try_join!(client_to_server, server_to_client)?;
214 info!("Player \'{name}\' left the game");
215
216 Ok(())
217}Sourcepub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>)
pub fn into_split(self) -> (ReadConnection<R>, WriteConnection<W>)
Split the reader and writer into two objects.
This doesn’t allocate.
Sourcepub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection)
pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection)
Split the reader and writer into the state-agnostic
RawReadConnection and RawWriteConnection types.
This is meant to help with some types of proxies.
Source§impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket>
impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket>
Sourcepub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError>
pub async fn new(address: &SocketAddr) -> Result<Self, ConnectionError>
Create a new connection to the given address.
Examples found in repository?
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Sourcepub async fn new_with_proxy(
address: &SocketAddr,
proxy: Proxy,
) -> Result<Self, ConnectionError>
pub async fn new_with_proxy( address: &SocketAddr, proxy: Proxy, ) -> Result<Self, ConnectionError>
Create a new connection to the given address and SOCKS5 proxy.
If you’re not using a proxy, use Self::new instead.
Sourcepub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError>
pub async fn new_from_stream(stream: TcpStream) -> Result<Self, ConnectionError>
Create a new connection from an existing stream.
Useful if you want to set custom options on the stream. Otherwise, just
use Self::new.
Sourcepub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket>
pub fn login(self) -> Connection<ClientboundLoginPacket, ServerboundLoginPacket>
Change our state from handshake to login.
This is the state that is used for logging in.
Examples found in repository?
178async fn transfer(
179 mut inbound: TcpStream,
180 intent: ServerboundIntention,
181 hello: ServerboundHello,
182) -> Result<(), Box<dyn Error>> {
183 let outbound = TcpStream::connect(PROXY_ADDR).await?;
184 let name = hello.name.clone();
185 outbound.set_nodelay(true)?;
186
187 // Repeat the intent and hello packet
188 // received earlier to the proxy target
189 let mut outbound_conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> =
190 Connection::wrap(outbound);
191 outbound_conn.write(intent).await?;
192
193 let mut outbound_conn = outbound_conn.login();
194 outbound_conn.write(hello).await?;
195
196 let mut outbound = outbound_conn.unwrap()?;
197
198 // Split the incoming and outgoing connections in
199 // halves and handle each pair on separate threads.
200 let (mut ri, mut wi) = inbound.split();
201 let (mut ro, mut wo) = outbound.split();
202
203 let client_to_server = async {
204 io::copy(&mut ri, &mut wo).await?;
205 wo.shutdown().await
206 };
207
208 let server_to_client = async {
209 io::copy(&mut ro, &mut wi).await?;
210 wi.shutdown().await
211 };
212
213 tokio::try_join!(client_to_server, server_to_client)?;
214 info!("Player \'{name}\' left the game");
215
216 Ok(())
217}More examples
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Sourcepub fn status(
self,
) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket>
pub fn status( self, ) -> Connection<ClientboundStatusPacket, ServerboundStatusPacket>
Change our state from handshake to status.
This is the state that is used for pinging the server.
Source§impl Connection<ClientboundLoginPacket, ServerboundLoginPacket>
impl Connection<ClientboundLoginPacket, ServerboundLoginPacket>
Sourcepub fn set_compression_threshold(&mut self, threshold: i32)
pub fn set_compression_threshold(&mut self, threshold: i32)
Set our compression threshold, i.e. the maximum size that a packet is allowed to be without getting compressed.
Setting it to 0 means every packet will be compressed. If you set it to less than 0 then compression is disabled.
Examples found in repository?
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Sourcepub fn set_encryption_key(&mut self, key: [u8; 16])
pub fn set_encryption_key(&mut self, key: [u8; 16])
Set the encryption key that is used to encrypt and decrypt packets.
It’s the same for both reading and writing.
Examples found in repository?
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Sourcepub fn config(
self,
) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket>
pub fn config( self, ) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket>
Change our state from login to configuration.
This is the state where the server sends us the registries and the resource pack.
Examples found in repository?
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Sourcepub async fn authenticate(
&self,
access_token: &str,
uuid: &Uuid,
private_key: [u8; 16],
packet: &ClientboundHello,
sessionserver_proxy: Option<Proxy>,
) -> Result<(), ClientSessionServerError>
pub async fn authenticate( &self, access_token: &str, uuid: &Uuid, private_key: [u8; 16], packet: &ClientboundHello, sessionserver_proxy: Option<Proxy>, ) -> Result<(), ClientSessionServerError>
Authenticate with Minecraft’s servers, which is required to join online-mode servers.
This must happen when you get a ClientboundLoginPacket::Hello packet.
§Examples
use azalea_auth::AuthResult;
use azalea_protocol::{
connect::Connection,
packets::login::{ClientboundLoginPacket, ServerboundKey},
};
use uuid::Uuid;
let AuthResult {
access_token,
profile,
} = azalea_auth::auth("[email protected]", azalea_auth::AuthOpts::default())
.await
.expect("Couldn't authenticate");
let mut conn = Connection::new(&resolved_address).await?;
// transition to the login state, in a real program we would have done a handshake first
let mut conn = conn.login();
match conn.read().await? {
ClientboundLoginPacket::Hello(p) => {
// tell Mojang we're joining the server & enable encryption
let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
conn.authenticate(&access_token, &profile.id, e.secret_key, &p, None)
.await?;
conn.write(ServerboundKey {
key_bytes: e.encrypted_public_key,
encrypted_challenge: e.encrypted_challenge,
})
.await?;
conn.set_encryption_key(e.secret_key);
}
_ => {}
}Examples found in repository?
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Source§impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket>
impl Connection<ServerboundHandshakePacket, ClientboundHandshakePacket>
Sourcepub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket>
pub fn login(self) -> Connection<ServerboundLoginPacket, ClientboundLoginPacket>
Change our state from handshake to login.
This is the state that is used while negotiating encryption and authenticating with Mojang.
Examples found in repository?
97async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
98 stream.set_nodelay(true)?;
99 let ip = stream.peer_addr()?;
100 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
101 Connection::wrap(stream);
102
103 // The first packet sent from a client is the intent packet.
104 // This specifies whether the client is pinging
105 // the server or is going to join the game.
106 let intent = match conn.read().await {
107 Ok(packet) => match packet {
108 ServerboundHandshakePacket::Intention(packet) => {
109 info!(
110 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
111 ip.ip(),
112 packet.hostname,
113 packet.port,
114 packet.protocol_version,
115 packet.intention
116 );
117 packet
118 }
119 },
120 Err(e) => {
121 let e = e.into();
122 warn!("Error during intent: {e}");
123 return Err(e);
124 }
125 };
126
127 match intent.intention {
128 // If the client is pinging the proxy, reply with the information below.
129 ClientIntention::Status => {
130 let mut conn = conn.status();
131 loop {
132 match conn.read().await {
133 Ok(p) => match p {
134 ServerboundStatusPacket::StatusRequest(_) => {
135 conn.write(ClientboundStatusResponse {
136 description: PROXY_DESC.into(),
137 favicon: PROXY_FAVICON.clone(),
138 players: PROXY_PLAYERS.clone(),
139 version: PROXY_VERSION.clone(),
140 enforces_secure_chat: PROXY_SECURE_CHAT,
141 })
142 .await?;
143 }
144 ServerboundStatusPacket::PingRequest(p) => {
145 conn.write(ClientboundPongResponse { time: p.time }).await?;
146 break;
147 }
148 },
149 Err(e) => match *e {
150 ReadPacketError::ConnectionClosed => {
151 break;
152 }
153 e => {
154 warn!("Error during status: {e}");
155 return Err(e.into());
156 }
157 },
158 }
159 }
160 }
161 // If the client intends to join the proxy, wait for them to send the `Hello` packet to log
162 // their username and uuid, then start proxying their connection.
163 ClientIntention::Login => {
164 let mut conn = conn.login();
165 loop {
166 match conn.read().await {
167 Ok(p) => {
168 if let ServerboundLoginPacket::Hello(hello) = p {
169 info!(
170 "Player \'{}\' from {} logging in with uuid: {}",
171 hello.name,
172 ip.ip(),
173 hello.profile_id.to_string()
174 );
175
176 tokio::spawn(proxy_conn(conn).map(|r| {
177 if let Err(e) = r {
178 error!("Failed to proxy: {e}");
179 }
180 }));
181
182 break;
183 }
184 }
185 Err(e) => match *e {
186 ReadPacketError::ConnectionClosed => {
187 break;
188 }
189 e => {
190 warn!("Error during login: {e}");
191 return Err(e.into());
192 }
193 },
194 }
195 }
196 }
197 ClientIntention::Transfer => {
198 warn!("Client attempted to join via transfer")
199 }
200 }
201
202 Ok(())
203}More examples
67async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
68 stream.set_nodelay(true)?;
69 let ip = stream.peer_addr()?;
70 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
71 Connection::wrap(stream);
72
73 // The first packet sent from a client is the intent packet.
74 // This specifies whether the client is pinging
75 // the server or is going to join the game.
76 let intent = match conn.read().await {
77 Ok(packet) => match packet {
78 ServerboundHandshakePacket::Intention(packet) => {
79 info!(
80 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
81 ip.ip(),
82 packet.hostname,
83 packet.port,
84 packet.protocol_version,
85 packet.intention
86 );
87 packet
88 }
89 },
90 Err(e) => {
91 let e = e.into();
92 warn!("Error during intent: {e}");
93 return Err(e);
94 }
95 };
96
97 match intent.intention {
98 // If the client is pinging the proxy,
99 // reply with the information below.
100 ClientIntention::Status => {
101 let mut conn = conn.status();
102 loop {
103 match conn.read().await {
104 Ok(p) => match p {
105 ServerboundStatusPacket::StatusRequest(_) => {
106 conn.write(ClientboundStatusResponse {
107 description: PROXY_DESC.into(),
108 favicon: PROXY_FAVICON.clone(),
109 players: PROXY_PLAYERS.clone(),
110 version: PROXY_VERSION.clone(),
111 enforces_secure_chat: PROXY_SECURE_CHAT,
112 })
113 .await?;
114 }
115 ServerboundStatusPacket::PingRequest(p) => {
116 conn.write(ClientboundPongResponse { time: p.time }).await?;
117 break;
118 }
119 },
120 Err(e) => match *e {
121 ReadPacketError::ConnectionClosed => {
122 break;
123 }
124 e => {
125 warn!("Error during status: {e}");
126 return Err(e.into());
127 }
128 },
129 }
130 }
131 }
132 // If the client intends to join the proxy,
133 // wait for them to send the `Hello` packet to
134 // log their username and uuid, then forward the
135 // connection along to the proxy target.
136 ClientIntention::Login => {
137 let mut conn = conn.login();
138 loop {
139 match conn.read().await {
140 Ok(p) => {
141 if let ServerboundLoginPacket::Hello(hello) = p {
142 info!(
143 "Player \'{0}\' from {1} logging in with uuid: {2}",
144 hello.name,
145 ip.ip(),
146 hello.profile_id.to_string()
147 );
148
149 tokio::spawn(transfer(conn.unwrap()?, intent, hello).map(|r| {
150 if let Err(e) = r {
151 error!("Failed to proxy: {e}");
152 }
153 }));
154
155 break;
156 }
157 }
158 Err(e) => match *e {
159 ReadPacketError::ConnectionClosed => {
160 break;
161 }
162 e => {
163 warn!("Error during login: {e}");
164 return Err(e.into());
165 }
166 },
167 }
168 }
169 }
170 ClientIntention::Transfer => {
171 warn!("Client attempted to join via transfer")
172 }
173 }
174
175 Ok(())
176}Sourcepub fn status(
self,
) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket>
pub fn status( self, ) -> Connection<ServerboundStatusPacket, ClientboundStatusPacket>
Change our state from handshake to status.
This is the state that is used for pinging the server.
Examples found in repository?
97async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
98 stream.set_nodelay(true)?;
99 let ip = stream.peer_addr()?;
100 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
101 Connection::wrap(stream);
102
103 // The first packet sent from a client is the intent packet.
104 // This specifies whether the client is pinging
105 // the server or is going to join the game.
106 let intent = match conn.read().await {
107 Ok(packet) => match packet {
108 ServerboundHandshakePacket::Intention(packet) => {
109 info!(
110 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
111 ip.ip(),
112 packet.hostname,
113 packet.port,
114 packet.protocol_version,
115 packet.intention
116 );
117 packet
118 }
119 },
120 Err(e) => {
121 let e = e.into();
122 warn!("Error during intent: {e}");
123 return Err(e);
124 }
125 };
126
127 match intent.intention {
128 // If the client is pinging the proxy, reply with the information below.
129 ClientIntention::Status => {
130 let mut conn = conn.status();
131 loop {
132 match conn.read().await {
133 Ok(p) => match p {
134 ServerboundStatusPacket::StatusRequest(_) => {
135 conn.write(ClientboundStatusResponse {
136 description: PROXY_DESC.into(),
137 favicon: PROXY_FAVICON.clone(),
138 players: PROXY_PLAYERS.clone(),
139 version: PROXY_VERSION.clone(),
140 enforces_secure_chat: PROXY_SECURE_CHAT,
141 })
142 .await?;
143 }
144 ServerboundStatusPacket::PingRequest(p) => {
145 conn.write(ClientboundPongResponse { time: p.time }).await?;
146 break;
147 }
148 },
149 Err(e) => match *e {
150 ReadPacketError::ConnectionClosed => {
151 break;
152 }
153 e => {
154 warn!("Error during status: {e}");
155 return Err(e.into());
156 }
157 },
158 }
159 }
160 }
161 // If the client intends to join the proxy, wait for them to send the `Hello` packet to log
162 // their username and uuid, then start proxying their connection.
163 ClientIntention::Login => {
164 let mut conn = conn.login();
165 loop {
166 match conn.read().await {
167 Ok(p) => {
168 if let ServerboundLoginPacket::Hello(hello) = p {
169 info!(
170 "Player \'{}\' from {} logging in with uuid: {}",
171 hello.name,
172 ip.ip(),
173 hello.profile_id.to_string()
174 );
175
176 tokio::spawn(proxy_conn(conn).map(|r| {
177 if let Err(e) = r {
178 error!("Failed to proxy: {e}");
179 }
180 }));
181
182 break;
183 }
184 }
185 Err(e) => match *e {
186 ReadPacketError::ConnectionClosed => {
187 break;
188 }
189 e => {
190 warn!("Error during login: {e}");
191 return Err(e.into());
192 }
193 },
194 }
195 }
196 }
197 ClientIntention::Transfer => {
198 warn!("Client attempted to join via transfer")
199 }
200 }
201
202 Ok(())
203}More examples
67async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
68 stream.set_nodelay(true)?;
69 let ip = stream.peer_addr()?;
70 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
71 Connection::wrap(stream);
72
73 // The first packet sent from a client is the intent packet.
74 // This specifies whether the client is pinging
75 // the server or is going to join the game.
76 let intent = match conn.read().await {
77 Ok(packet) => match packet {
78 ServerboundHandshakePacket::Intention(packet) => {
79 info!(
80 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
81 ip.ip(),
82 packet.hostname,
83 packet.port,
84 packet.protocol_version,
85 packet.intention
86 );
87 packet
88 }
89 },
90 Err(e) => {
91 let e = e.into();
92 warn!("Error during intent: {e}");
93 return Err(e);
94 }
95 };
96
97 match intent.intention {
98 // If the client is pinging the proxy,
99 // reply with the information below.
100 ClientIntention::Status => {
101 let mut conn = conn.status();
102 loop {
103 match conn.read().await {
104 Ok(p) => match p {
105 ServerboundStatusPacket::StatusRequest(_) => {
106 conn.write(ClientboundStatusResponse {
107 description: PROXY_DESC.into(),
108 favicon: PROXY_FAVICON.clone(),
109 players: PROXY_PLAYERS.clone(),
110 version: PROXY_VERSION.clone(),
111 enforces_secure_chat: PROXY_SECURE_CHAT,
112 })
113 .await?;
114 }
115 ServerboundStatusPacket::PingRequest(p) => {
116 conn.write(ClientboundPongResponse { time: p.time }).await?;
117 break;
118 }
119 },
120 Err(e) => match *e {
121 ReadPacketError::ConnectionClosed => {
122 break;
123 }
124 e => {
125 warn!("Error during status: {e}");
126 return Err(e.into());
127 }
128 },
129 }
130 }
131 }
132 // If the client intends to join the proxy,
133 // wait for them to send the `Hello` packet to
134 // log their username and uuid, then forward the
135 // connection along to the proxy target.
136 ClientIntention::Login => {
137 let mut conn = conn.login();
138 loop {
139 match conn.read().await {
140 Ok(p) => {
141 if let ServerboundLoginPacket::Hello(hello) = p {
142 info!(
143 "Player \'{0}\' from {1} logging in with uuid: {2}",
144 hello.name,
145 ip.ip(),
146 hello.profile_id.to_string()
147 );
148
149 tokio::spawn(transfer(conn.unwrap()?, intent, hello).map(|r| {
150 if let Err(e) = r {
151 error!("Failed to proxy: {e}");
152 }
153 }));
154
155 break;
156 }
157 }
158 Err(e) => match *e {
159 ReadPacketError::ConnectionClosed => {
160 break;
161 }
162 e => {
163 warn!("Error during login: {e}");
164 return Err(e.into());
165 }
166 },
167 }
168 }
169 }
170 ClientIntention::Transfer => {
171 warn!("Client attempted to join via transfer")
172 }
173 }
174
175 Ok(())
176}Source§impl Connection<ServerboundLoginPacket, ClientboundLoginPacket>
impl Connection<ServerboundLoginPacket, ClientboundLoginPacket>
Sourcepub fn set_compression_threshold(&mut self, threshold: i32)
pub fn set_compression_threshold(&mut self, threshold: i32)
Set our compression threshold, i.e. the maximum size that a packet is allowed to be without getting compressed.
If you set it to less than 0 then compression gets disabled.
Sourcepub fn set_encryption_key(&mut self, key: [u8; 16])
pub fn set_encryption_key(&mut self, key: [u8; 16])
Set the encryption key that is used to encrypt and decrypt packets.
It’s the same for both reading and writing.
Sourcepub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket>
pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket>
Change our state from login to game.
This is the state that’s used when the client is actually in the game.
Sourcepub async fn authenticate(
&self,
username: &str,
public_key: &[u8],
private_key: &[u8; 16],
ip: Option<&str>,
) -> Result<GameProfile, ServerSessionServerError>
pub async fn authenticate( &self, username: &str, public_key: &[u8], private_key: &[u8; 16], ip: Option<&str>, ) -> Result<GameProfile, ServerSessionServerError>
Verify connecting clients have authenticated with Minecraft’s servers.
This must happen after the client sends a ServerboundLoginPacket::Key
packet.
Sourcepub fn config(
self,
) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket>
pub fn config( self, ) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket>
Change our state back to configuration.
Examples found in repository?
205async fn proxy_conn(
206 mut client_conn: Connection<ServerboundLoginPacket, ClientboundLoginPacket>,
207) -> Result<(), Box<dyn Error>> {
208 // resolve TARGET_ADDR
209 let parsed_target_addr = ServerAddr::try_from(TARGET_ADDR).unwrap();
210 let resolved_target_addr = resolve_address(&parsed_target_addr).await?;
211
212 let mut server_conn = Connection::new(&resolved_target_addr).await?;
213
214 let account = if ACCOUNT.contains('@') {
215 Account::microsoft(ACCOUNT).await?
216 } else {
217 Account::offline(ACCOUNT)
218 };
219 println!("got account: {:?}", account);
220
221 server_conn
222 .write(ServerboundIntention {
223 protocol_version: PROTOCOL_VERSION,
224 hostname: parsed_target_addr.host,
225 port: parsed_target_addr.port,
226 intention: ClientIntention::Login,
227 })
228 .await?;
229 let mut server_conn = server_conn.login();
230
231 // login
232 server_conn
233 .write(ServerboundHello {
234 name: account.username().to_owned(),
235 profile_id: account.uuid(),
236 })
237 .await?;
238
239 let (server_conn, login_finished) = loop {
240 let packet = server_conn.read().await?;
241
242 println!("got packet: {:?}", packet);
243
244 match packet {
245 ClientboundLoginPacket::Hello(p) => {
246 debug!("Got encryption request");
247 let e = azalea_crypto::encrypt(&p.public_key, &p.challenge).unwrap();
248
249 if let Some(access_token) = account.access_token() {
250 // keep track of the number of times we tried
251 // authenticating so we can give up after too many
252 let mut attempts: usize = 1;
253
254 while let Err(e) = {
255 server_conn
256 .authenticate(&access_token, &account.uuid(), e.secret_key, &p, None)
257 .await
258 } {
259 if attempts >= 2 {
260 // if this is the second attempt and we failed
261 // both times, give up
262 return Err(e.into());
263 }
264 if matches!(
265 e,
266 ClientSessionServerError::InvalidSession
267 | ClientSessionServerError::ForbiddenOperation
268 ) {
269 // uh oh, we got an invalid session and have
270 // to reauthenticate now
271 account.refresh().await?;
272 } else {
273 return Err(e.into());
274 }
275 attempts += 1;
276 }
277 }
278
279 server_conn
280 .write(ServerboundKey {
281 key_bytes: e.encrypted_public_key,
282 encrypted_challenge: e.encrypted_challenge,
283 })
284 .await?;
285
286 server_conn.set_encryption_key(e.secret_key);
287 }
288 ClientboundLoginPacket::LoginCompression(p) => {
289 debug!("Got compression request {:?}", p.compression_threshold);
290 server_conn.set_compression_threshold(p.compression_threshold);
291 }
292 ClientboundLoginPacket::LoginFinished(p) => {
293 debug!(
294 "Got profile {:?}. handshake is finished and we're now switching to the configuration state",
295 p.game_profile
296 );
297 // server_conn.write(ServerboundLoginAcknowledged {}).await?;
298 break (server_conn.config(), p);
299 }
300 ClientboundLoginPacket::LoginDisconnect(p) => {
301 error!("Got disconnect {p:?}");
302 return Err("Disconnected".into());
303 }
304 ClientboundLoginPacket::CustomQuery(p) => {
305 debug!("Got custom query {:?}", p);
306 // replying to custom query is done in
307 // packet_handling::login::process_packet_events
308 }
309 ClientboundLoginPacket::CookieRequest(p) => {
310 debug!("Got cookie request {:?}", p);
311
312 server_conn
313 .write(packets::login::ServerboundCookieResponse {
314 key: p.key,
315 // cookies aren't implemented
316 payload: None,
317 })
318 .await?;
319 }
320 }
321 };
322
323 // give the client the login_finished
324 println!("got the login_finished: {:?}", login_finished);
325 client_conn.write(login_finished).await?;
326 let client_conn = client_conn.config();
327
328 info!("started direct bridging");
329
330 // bridge packets
331 let listen_raw_reader = client_conn.reader.raw;
332 let listen_raw_writer = client_conn.writer.raw;
333
334 let target_raw_reader = server_conn.reader.raw;
335 let target_raw_writer = server_conn.writer.raw;
336
337 let packet_logs_txt = Arc::new(tokio::sync::Mutex::new(
338 File::create("combined.txt").await.unwrap(),
339 ));
340
341 let packet_logs_txt_clone = packet_logs_txt.clone();
342 let copy_listen_to_target = tokio::spawn(async move {
343 let mut listen_raw_reader = listen_raw_reader;
344 let mut target_raw_writer = target_raw_writer;
345
346 let packet_logs_txt = packet_logs_txt_clone;
347
348 let mut serverbound_parsed_txt = File::create("serverbound.txt").await.unwrap();
349
350 loop {
351 let packet = match listen_raw_reader.read().await {
352 Ok(p) => p,
353 Err(e) => {
354 error!("Error reading packet from listen: {e}");
355 return;
356 }
357 };
358
359 // decode as a game packet
360 let decoded_packet = azalea_protocol::read::deserialize_packet::<ServerboundGamePacket>(
361 &mut Cursor::new(&packet),
362 );
363
364 if let Ok(decoded_packet) = decoded_packet {
365 let timestamp = chrono::Utc::now();
366 let _ = serverbound_parsed_txt
367 .write_all(format!("{timestamp} {:?}\n", decoded_packet).as_bytes())
368 .await;
369 let _ = packet_logs_txt
370 .lock()
371 .await
372 .write_all(format!("{timestamp} <- {:?}\n", decoded_packet).as_bytes())
373 .await;
374 }
375
376 match target_raw_writer.write(&packet).await {
377 Ok(_) => {}
378 Err(e) => {
379 error!("Error writing packet to target: {e}");
380 return;
381 }
382 }
383 }
384 });
385
386 // write to clientbound.txt in a separate task so it doesn't block receiving
387 // packets
388 let (clientbound_tx, mut clientbound_rx) = tokio::sync::mpsc::unbounded_channel::<Box<[u8]>>();
389 let copy_clientbound_to_file = tokio::spawn(async move {
390 let mut clientbound_parsed_txt = File::create("clientbound.txt").await.unwrap();
391
392 loop {
393 let Some(packet) = clientbound_rx.recv().await else {
394 return;
395 };
396
397 // decode as a game packet
398 let decoded_packet = azalea_protocol::read::deserialize_packet::<ClientboundGamePacket>(
399 &mut Cursor::new(&packet),
400 );
401
402 if let Ok(decoded_packet) = decoded_packet {
403 let timestamp = chrono::Utc::now();
404 let _ = clientbound_parsed_txt
405 .write_all(format!("{timestamp} {decoded_packet:?}\n").as_bytes())
406 .await;
407 let _ = packet_logs_txt
408 .lock()
409 .await
410 .write_all(format!("{timestamp} -> {decoded_packet:?}\n").as_bytes())
411 .await;
412 }
413 }
414 });
415
416 let copy_remote_to_local = tokio::spawn(async move {
417 let mut target_raw_reader = target_raw_reader;
418 let mut listen_raw_writer = listen_raw_writer;
419
420 loop {
421 let packet = match target_raw_reader.read().await {
422 Ok(p) => p,
423 Err(e) => {
424 error!("Error reading packet from target: {e}");
425 return;
426 }
427 };
428
429 clientbound_tx.send(packet.clone()).unwrap();
430
431 match listen_raw_writer.write(&packet).await {
432 Ok(_) => {}
433 Err(e) => {
434 error!("Error writing packet to listen: {e}");
435 return;
436 }
437 }
438 }
439 });
440
441 tokio::try_join!(
442 copy_listen_to_target,
443 copy_remote_to_local,
444 copy_clientbound_to_file
445 )?;
446
447 Ok(())
448}Source§impl Connection<ServerboundConfigPacket, ClientboundConfigPacket>
impl Connection<ServerboundConfigPacket, ClientboundConfigPacket>
Sourcepub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket>
pub fn game(self) -> Connection<ServerboundGamePacket, ClientboundGamePacket>
Change our state from configuration to game.
This is the state that’s used when the client is actually in the world.
Source§impl Connection<ClientboundConfigPacket, ServerboundConfigPacket>
impl Connection<ClientboundConfigPacket, ServerboundConfigPacket>
Sourcepub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket>
pub fn game(self) -> Connection<ClientboundGamePacket, ServerboundGamePacket>
Change our state from configuration to game.
This is the state that’s used when the client is actually in the world.
Source§impl Connection<ClientboundGamePacket, ServerboundGamePacket>
impl Connection<ClientboundGamePacket, ServerboundGamePacket>
Sourcepub fn config(
self,
) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket>
pub fn config( self, ) -> Connection<ClientboundConfigPacket, ServerboundConfigPacket>
Change our state back to configuration.
Source§impl Connection<ServerboundGamePacket, ClientboundGamePacket>
impl Connection<ServerboundGamePacket, ClientboundGamePacket>
Sourcepub fn config(
self,
) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket>
pub fn config( self, ) -> Connection<ServerboundConfigPacket, ClientboundConfigPacket>
Change our state back to configuration.
Source§impl<R1, W1> Connection<R1, W1>
impl<R1, W1> Connection<R1, W1>
Sourcepub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
pub fn from<R2, W2>(connection: Connection<R1, W1>) -> Connection<R2, W2>
Creates a Connection of a type from a Connection of another type.
Useful for servers or custom packets.
Sourcepub fn wrap(stream: TcpStream) -> Connection<R1, W1>
pub fn wrap(stream: TcpStream) -> Connection<R1, W1>
Convert an existing TcpStream into a Connection. Useful for servers.
Examples found in repository?
97async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
98 stream.set_nodelay(true)?;
99 let ip = stream.peer_addr()?;
100 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
101 Connection::wrap(stream);
102
103 // The first packet sent from a client is the intent packet.
104 // This specifies whether the client is pinging
105 // the server or is going to join the game.
106 let intent = match conn.read().await {
107 Ok(packet) => match packet {
108 ServerboundHandshakePacket::Intention(packet) => {
109 info!(
110 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
111 ip.ip(),
112 packet.hostname,
113 packet.port,
114 packet.protocol_version,
115 packet.intention
116 );
117 packet
118 }
119 },
120 Err(e) => {
121 let e = e.into();
122 warn!("Error during intent: {e}");
123 return Err(e);
124 }
125 };
126
127 match intent.intention {
128 // If the client is pinging the proxy, reply with the information below.
129 ClientIntention::Status => {
130 let mut conn = conn.status();
131 loop {
132 match conn.read().await {
133 Ok(p) => match p {
134 ServerboundStatusPacket::StatusRequest(_) => {
135 conn.write(ClientboundStatusResponse {
136 description: PROXY_DESC.into(),
137 favicon: PROXY_FAVICON.clone(),
138 players: PROXY_PLAYERS.clone(),
139 version: PROXY_VERSION.clone(),
140 enforces_secure_chat: PROXY_SECURE_CHAT,
141 })
142 .await?;
143 }
144 ServerboundStatusPacket::PingRequest(p) => {
145 conn.write(ClientboundPongResponse { time: p.time }).await?;
146 break;
147 }
148 },
149 Err(e) => match *e {
150 ReadPacketError::ConnectionClosed => {
151 break;
152 }
153 e => {
154 warn!("Error during status: {e}");
155 return Err(e.into());
156 }
157 },
158 }
159 }
160 }
161 // If the client intends to join the proxy, wait for them to send the `Hello` packet to log
162 // their username and uuid, then start proxying their connection.
163 ClientIntention::Login => {
164 let mut conn = conn.login();
165 loop {
166 match conn.read().await {
167 Ok(p) => {
168 if let ServerboundLoginPacket::Hello(hello) = p {
169 info!(
170 "Player \'{}\' from {} logging in with uuid: {}",
171 hello.name,
172 ip.ip(),
173 hello.profile_id.to_string()
174 );
175
176 tokio::spawn(proxy_conn(conn).map(|r| {
177 if let Err(e) = r {
178 error!("Failed to proxy: {e}");
179 }
180 }));
181
182 break;
183 }
184 }
185 Err(e) => match *e {
186 ReadPacketError::ConnectionClosed => {
187 break;
188 }
189 e => {
190 warn!("Error during login: {e}");
191 return Err(e.into());
192 }
193 },
194 }
195 }
196 }
197 ClientIntention::Transfer => {
198 warn!("Client attempted to join via transfer")
199 }
200 }
201
202 Ok(())
203}More examples
67async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
68 stream.set_nodelay(true)?;
69 let ip = stream.peer_addr()?;
70 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
71 Connection::wrap(stream);
72
73 // The first packet sent from a client is the intent packet.
74 // This specifies whether the client is pinging
75 // the server or is going to join the game.
76 let intent = match conn.read().await {
77 Ok(packet) => match packet {
78 ServerboundHandshakePacket::Intention(packet) => {
79 info!(
80 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
81 ip.ip(),
82 packet.hostname,
83 packet.port,
84 packet.protocol_version,
85 packet.intention
86 );
87 packet
88 }
89 },
90 Err(e) => {
91 let e = e.into();
92 warn!("Error during intent: {e}");
93 return Err(e);
94 }
95 };
96
97 match intent.intention {
98 // If the client is pinging the proxy,
99 // reply with the information below.
100 ClientIntention::Status => {
101 let mut conn = conn.status();
102 loop {
103 match conn.read().await {
104 Ok(p) => match p {
105 ServerboundStatusPacket::StatusRequest(_) => {
106 conn.write(ClientboundStatusResponse {
107 description: PROXY_DESC.into(),
108 favicon: PROXY_FAVICON.clone(),
109 players: PROXY_PLAYERS.clone(),
110 version: PROXY_VERSION.clone(),
111 enforces_secure_chat: PROXY_SECURE_CHAT,
112 })
113 .await?;
114 }
115 ServerboundStatusPacket::PingRequest(p) => {
116 conn.write(ClientboundPongResponse { time: p.time }).await?;
117 break;
118 }
119 },
120 Err(e) => match *e {
121 ReadPacketError::ConnectionClosed => {
122 break;
123 }
124 e => {
125 warn!("Error during status: {e}");
126 return Err(e.into());
127 }
128 },
129 }
130 }
131 }
132 // If the client intends to join the proxy,
133 // wait for them to send the `Hello` packet to
134 // log their username and uuid, then forward the
135 // connection along to the proxy target.
136 ClientIntention::Login => {
137 let mut conn = conn.login();
138 loop {
139 match conn.read().await {
140 Ok(p) => {
141 if let ServerboundLoginPacket::Hello(hello) = p {
142 info!(
143 "Player \'{0}\' from {1} logging in with uuid: {2}",
144 hello.name,
145 ip.ip(),
146 hello.profile_id.to_string()
147 );
148
149 tokio::spawn(transfer(conn.unwrap()?, intent, hello).map(|r| {
150 if let Err(e) = r {
151 error!("Failed to proxy: {e}");
152 }
153 }));
154
155 break;
156 }
157 }
158 Err(e) => match *e {
159 ReadPacketError::ConnectionClosed => {
160 break;
161 }
162 e => {
163 warn!("Error during login: {e}");
164 return Err(e.into());
165 }
166 },
167 }
168 }
169 }
170 ClientIntention::Transfer => {
171 warn!("Client attempted to join via transfer")
172 }
173 }
174
175 Ok(())
176}
177
178async fn transfer(
179 mut inbound: TcpStream,
180 intent: ServerboundIntention,
181 hello: ServerboundHello,
182) -> Result<(), Box<dyn Error>> {
183 let outbound = TcpStream::connect(PROXY_ADDR).await?;
184 let name = hello.name.clone();
185 outbound.set_nodelay(true)?;
186
187 // Repeat the intent and hello packet
188 // received earlier to the proxy target
189 let mut outbound_conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> =
190 Connection::wrap(outbound);
191 outbound_conn.write(intent).await?;
192
193 let mut outbound_conn = outbound_conn.login();
194 outbound_conn.write(hello).await?;
195
196 let mut outbound = outbound_conn.unwrap()?;
197
198 // Split the incoming and outgoing connections in
199 // halves and handle each pair on separate threads.
200 let (mut ri, mut wi) = inbound.split();
201 let (mut ro, mut wo) = outbound.split();
202
203 let client_to_server = async {
204 io::copy(&mut ri, &mut wo).await?;
205 wo.shutdown().await
206 };
207
208 let server_to_client = async {
209 io::copy(&mut ro, &mut wi).await?;
210 wi.shutdown().await
211 };
212
213 tokio::try_join!(client_to_server, server_to_client)?;
214 info!("Player \'{name}\' left the game");
215
216 Ok(())
217}Sourcepub fn unwrap(self) -> Result<TcpStream, ReuniteError>
pub fn unwrap(self) -> Result<TcpStream, ReuniteError>
Convert from a Connection into a TcpStream. Useful for servers.
Examples found in repository?
67async fn handle_connection(stream: TcpStream) -> eyre::Result<()> {
68 stream.set_nodelay(true)?;
69 let ip = stream.peer_addr()?;
70 let mut conn: Connection<ServerboundHandshakePacket, ClientboundHandshakePacket> =
71 Connection::wrap(stream);
72
73 // The first packet sent from a client is the intent packet.
74 // This specifies whether the client is pinging
75 // the server or is going to join the game.
76 let intent = match conn.read().await {
77 Ok(packet) => match packet {
78 ServerboundHandshakePacket::Intention(packet) => {
79 info!(
80 "New connection from {}, hostname {:?}:{}, version {}, {:?}",
81 ip.ip(),
82 packet.hostname,
83 packet.port,
84 packet.protocol_version,
85 packet.intention
86 );
87 packet
88 }
89 },
90 Err(e) => {
91 let e = e.into();
92 warn!("Error during intent: {e}");
93 return Err(e);
94 }
95 };
96
97 match intent.intention {
98 // If the client is pinging the proxy,
99 // reply with the information below.
100 ClientIntention::Status => {
101 let mut conn = conn.status();
102 loop {
103 match conn.read().await {
104 Ok(p) => match p {
105 ServerboundStatusPacket::StatusRequest(_) => {
106 conn.write(ClientboundStatusResponse {
107 description: PROXY_DESC.into(),
108 favicon: PROXY_FAVICON.clone(),
109 players: PROXY_PLAYERS.clone(),
110 version: PROXY_VERSION.clone(),
111 enforces_secure_chat: PROXY_SECURE_CHAT,
112 })
113 .await?;
114 }
115 ServerboundStatusPacket::PingRequest(p) => {
116 conn.write(ClientboundPongResponse { time: p.time }).await?;
117 break;
118 }
119 },
120 Err(e) => match *e {
121 ReadPacketError::ConnectionClosed => {
122 break;
123 }
124 e => {
125 warn!("Error during status: {e}");
126 return Err(e.into());
127 }
128 },
129 }
130 }
131 }
132 // If the client intends to join the proxy,
133 // wait for them to send the `Hello` packet to
134 // log their username and uuid, then forward the
135 // connection along to the proxy target.
136 ClientIntention::Login => {
137 let mut conn = conn.login();
138 loop {
139 match conn.read().await {
140 Ok(p) => {
141 if let ServerboundLoginPacket::Hello(hello) = p {
142 info!(
143 "Player \'{0}\' from {1} logging in with uuid: {2}",
144 hello.name,
145 ip.ip(),
146 hello.profile_id.to_string()
147 );
148
149 tokio::spawn(transfer(conn.unwrap()?, intent, hello).map(|r| {
150 if let Err(e) = r {
151 error!("Failed to proxy: {e}");
152 }
153 }));
154
155 break;
156 }
157 }
158 Err(e) => match *e {
159 ReadPacketError::ConnectionClosed => {
160 break;
161 }
162 e => {
163 warn!("Error during login: {e}");
164 return Err(e.into());
165 }
166 },
167 }
168 }
169 }
170 ClientIntention::Transfer => {
171 warn!("Client attempted to join via transfer")
172 }
173 }
174
175 Ok(())
176}
177
178async fn transfer(
179 mut inbound: TcpStream,
180 intent: ServerboundIntention,
181 hello: ServerboundHello,
182) -> Result<(), Box<dyn Error>> {
183 let outbound = TcpStream::connect(PROXY_ADDR).await?;
184 let name = hello.name.clone();
185 outbound.set_nodelay(true)?;
186
187 // Repeat the intent and hello packet
188 // received earlier to the proxy target
189 let mut outbound_conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> =
190 Connection::wrap(outbound);
191 outbound_conn.write(intent).await?;
192
193 let mut outbound_conn = outbound_conn.login();
194 outbound_conn.write(hello).await?;
195
196 let mut outbound = outbound_conn.unwrap()?;
197
198 // Split the incoming and outgoing connections in
199 // halves and handle each pair on separate threads.
200 let (mut ri, mut wi) = inbound.split();
201 let (mut ro, mut wo) = outbound.split();
202
203 let client_to_server = async {
204 io::copy(&mut ri, &mut wo).await?;
205 wo.shutdown().await
206 };
207
208 let server_to_client = async {
209 io::copy(&mut ro, &mut wi).await?;
210 wi.shutdown().await
211 };
212
213 tokio::try_join!(client_to_server, server_to_client)?;
214 info!("Player \'{name}\' left the game");
215
216 Ok(())
217}Auto Trait Implementations§
impl<R, W> Freeze for Connection<R, W>
impl<R, W> RefUnwindSafe for Connection<R, W>where
R: RefUnwindSafe,
W: RefUnwindSafe,
impl<R, W> Send for Connection<R, W>
impl<R, W> Sync for Connection<R, W>
impl<R, W> Unpin for Connection<R, W>
impl<R, W> UnsafeUnpin for Connection<R, W>
impl<R, W> UnwindSafe for Connection<R, W>where
R: UnwindSafe,
W: UnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.