Skip to content

Commit e2dc591

Browse files
ramfox“ramfox”matheus23Frandodignifiedquire
authored
feat: make Connection::remote_id and Connection::alpn infallible (#3556)
## Description This PR refactors the connection API to make `Connection::remote_id()` and `Connection::alpn()` infallible. Previously, these methods could fail if called before the handshake completed or if the handshake data was unavailable. Now, `Connection` guarantees that it represents a fully authenticated connection with verified remote identity and ALPN protocol, since it now can only be constructed after successful handshake completion and authentication, eliminating the need for fallible accessors. ### 0-RTT API Improvements The 0-RTT API has been restructured with clearer types and semantics: - Use `Incoming::accept` to return an `Accepting`. Use `Accepting::into_0rtt` to return an `IncomingZeroRttConnection` - Use `Connecting::into_0rtt` to return a `OutgoingZeroRttConnection` - **`OutgoingZeroRttConnection`**: Represents client-side 0-RTT connections created via `Connecting::into_0rtt()`. Allows sending 0-RTT data before the handshake completes. Call `handshake_completed()` to get a `ZeroRttStatus` indicating whether the 0-RTT data was accepted or rejected by the server. - **`IncomingZeroRttConnection`**: Represents server-side 0-RTT/0.5-RTT connections created via `Accepting::into_0rtt()`. Allows receiving 0-RTT data from clients or sending 0.5-RTT data before the handshake completes. Call `handshake_completed()` to get a fully authenticated `Connection`. - **`ZeroRttStatus` enum**: Returned by `OutgoingZeroRttConnection::handshake_completed()` to indicate whether the server accepted or rejected the 0-RTT data: - `ZeroRttStatus::Accepted(Connection)`: 0-RTT data was accepted, streams opened before handshake remain valid - `ZeroRttStatus::Rejected(Connection)`: 0-RTT data was rejected, pre-handshake streams will error and data must be resent These types replace the previous the previous version of `Connection` & the `ZeroRttAccepted` type and provide a more explicit API for handling 0-RTT connection states and outcomes. ### Error Handling - Connection authentication failures now return `AuthenticationError` during the conversion between a `quinn:Connection` (which can exist before the handshake completes) and an `iroh::Connection`. This error may occur if the remote endpoint is not using the expected TLS cryptography or if the remote endpoint is a plain quinn connection. You may also see these errors after calling the `handshake_completed` method on `IncomingZeroRttConnection` or `OutgoingZeroRttConnection`, if the connection was closed before the handshake completed. - New `ConnectingError` type for errors during handshake completion ## Breaking Changes - `iroh` - `changed` - `ConnectError::Connection` - fields changed - `AcceptError::Connection` - fields changed - `AcceptError::MissingRemoteEndpointId` - fields changed - `AcceptError::NotAllowed` - fields changed - `AcceptError::User` - fields changed - `Connecting::into_0rtt` -> returns `Result<OutgoingZeroRttConnection, Connecting>` - `removed` - `ProtocolHandler::on_connecting()` removed - implement `on_accepting()` instead, which takes `Accepting` rather than `Connecting` - `DynProtocolHandler::on_connecting()` removed - implement `on_accepting()` instead - `iroh::endpoint::IncomingFuture` - use `Accepting` instead - `iroh::endpoint::ZeroRttAccepted` - replaced by explicit 0-RTT connection types --------- Co-authored-by: “ramfox” <“[email protected]”> Co-authored-by: Philipp Krüger <[email protected]> Co-authored-by: Frando <[email protected]> Co-authored-by: Friedel Ziegelmayer <[email protected]>
1 parent 30c23e8 commit e2dc591

File tree

18 files changed

+1898
-1046
lines changed

18 files changed

+1898
-1046
lines changed

iroh/bench/src/iroh.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,16 +249,16 @@ pub async fn server(endpoint: Endpoint, opt: Opt) -> Result<()> {
249249
// Handle only the expected amount of clients
250250
for _ in 0..opt.clients {
251251
let incoming = endpoint.accept().await.unwrap();
252-
let connecting = match incoming.accept() {
253-
Ok(connecting) => connecting,
252+
let accepting = match incoming.accept() {
253+
Ok(accepting) => accepting,
254254
Err(err) => {
255255
warn!("incoming connection failed: {err:#}");
256256
// we can carry on in these cases:
257257
// this can be caused by retransmitted datagrams
258258
continue;
259259
}
260260
};
261-
let connection = connecting.await.std_context("handshake failed")?;
261+
let connection = accepting.await.context("handshake failed")?;
262262

263263
server_tasks.push(tokio::spawn(async move {
264264
loop {

iroh/bench/src/quinn.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,16 +220,16 @@ pub async fn server(endpoint: Endpoint, opt: Opt) -> Result<()> {
220220
// Handle only the expected amount of clients
221221
for _ in 0..opt.clients {
222222
let incoming = endpoint.accept().await.unwrap();
223-
let connecting = match incoming.accept() {
224-
Ok(connecting) => connecting,
223+
let accepting = match incoming.accept() {
224+
Ok(accepting) => accepting,
225225
Err(err) => {
226226
warn!("incoming connection failed: {err:#}");
227227
// we can carry on in these cases:
228228
// this can be caused by retransmitted datagrams
229229
continue;
230230
}
231231
};
232-
let connection = connecting.await.std_context("handshake failed")?;
232+
let connection = accepting.await.std_context("handshake failed")?;
233233

234234
server_tasks.push(tokio::spawn(async move {
235235
loop {

iroh/examples/0rtt.rs

Lines changed: 59 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
use std::{env, future::Future, str::FromStr, time::Instant};
1+
use std::{env, str::FromStr, time::Instant};
22

33
use clap::Parser;
44
use data_encoding::HEXLOWER;
5-
use iroh::{
6-
EndpointId, SecretKey,
7-
endpoint::{Connecting, Connection},
8-
};
9-
use n0_error::{Result, StackResultExt, StdResultExt, bail_any};
10-
use n0_future::{StreamExt, future};
11-
use n0_watcher::Watcher;
5+
use iroh::{EndpointId, SecretKey, discovery::Discovery, endpoint::ZeroRttStatus};
6+
use n0_error::{Result, StackResultExt, StdResultExt};
7+
use n0_future::StreamExt;
8+
use quinn::{RecvStream, SendStream};
129
use tracing::{info, trace};
1310

1411
const PINGPONG_ALPN: &[u8] = b"0rtt-pingpong";
@@ -44,59 +41,39 @@ pub fn get_or_generate_secret_key() -> Result<SecretKey> {
4441
}
4542

4643
/// Do a simple ping-pong with the given connection.
47-
///
48-
/// We send the data on the connection. If `proceed` resolves to true,
49-
/// read the response immediately. Otherwise, the stream pair is bad and we need
50-
/// to open a new stream pair.
51-
async fn pingpong(
52-
connection: &Connection,
53-
proceed: impl Future<Output = bool>,
54-
x: u64,
55-
) -> Result<()> {
56-
let (mut send, recv) = connection.open_bi().await.anyerr()?;
44+
async fn pingpong(send: SendStream, recv: RecvStream, x: u64) -> Result<()> {
45+
ping(send, x).await?;
46+
pong(recv, x).await
47+
}
48+
49+
async fn ping(mut send: SendStream, x: u64) -> Result<()> {
5750
let data = x.to_be_bytes();
5851
send.write_all(&data).await.anyerr()?;
59-
send.finish().anyerr()?;
60-
let mut recv = if proceed.await {
61-
// use recv directly if we can proceed
62-
recv
63-
} else {
64-
// proceed returned false, so we have learned that the 0-RTT send was rejected.
65-
// at this point we have a fully handshaked connection, so we try again.
66-
let (mut send, recv) = connection.open_bi().await.anyerr()?;
67-
send.write_all(&data).await.anyerr()?;
68-
send.finish().anyerr()?;
69-
recv
70-
};
52+
send.finish().anyerr()
53+
}
54+
55+
async fn pong(mut recv: RecvStream, x: u64) -> Result<()> {
56+
let data = x.to_be_bytes();
7157
let echo = recv.read_to_end(8).await.anyerr()?;
7258
assert!(echo == data);
7359
Ok(())
7460
}
7561

76-
async fn pingpong_0rtt(connecting: Connecting, i: u64) -> Result<Connection> {
77-
let connection = match connecting.into_0rtt() {
78-
Ok((connection, accepted)) => {
79-
trace!("0-RTT possible from our side");
80-
pingpong(&connection, accepted, i).await?;
81-
connection
82-
}
83-
Err(connecting) => {
84-
trace!("0-RTT not possible from our side");
85-
let connection = connecting.await.anyerr()?;
86-
pingpong(&connection, future::ready(true), i).await?;
87-
connection
88-
}
89-
};
90-
Ok(connection)
91-
}
92-
9362
async fn connect(args: Args) -> Result<()> {
94-
let remote_id = args.endpoint_id.context("Missing endpoint id")?;
63+
let remote_id = args.endpoint_id.unwrap();
9564
let endpoint = iroh::Endpoint::builder()
9665
.relay_mode(iroh::RelayMode::Disabled)
9766
.keylog(true)
9867
.bind()
9968
.await?;
69+
// ensure we have resolved the remote_id before connecting
70+
// so we get a more accurate connection timing
71+
let mut discovery_stream = endpoint
72+
.discovery()
73+
.resolve(remote_id)
74+
.expect("discovery to be enabled");
75+
let _ = discovery_stream.next().await;
76+
10077
let t0 = Instant::now();
10178
for i in 0..args.rounds {
10279
let t0 = Instant::now();
@@ -106,18 +83,40 @@ async fn connect(args: Args) -> Result<()> {
10683
let connection = if args.disable_0rtt {
10784
let connection = connecting.await.anyerr()?;
10885
trace!("connecting without 0-RTT");
109-
pingpong(&connection, future::ready(true), i).await?;
86+
let (send, recv) = connection.open_bi().await.anyerr()?;
87+
pingpong(send, recv, i).await?;
11088
connection
11189
} else {
112-
pingpong_0rtt(connecting, i).await?
90+
match connecting.into_0rtt() {
91+
Ok(zrtt_connection) => {
92+
trace!("0-RTT possible from our side");
93+
let (send, recv) = zrtt_connection.open_bi().await.anyerr()?;
94+
// before we get the full handshake, attempt to send 0-RTT data
95+
let zrtt_task = tokio::spawn(ping(send, i));
96+
match zrtt_connection.handshake_completed().await? {
97+
ZeroRttStatus::Accepted(conn) => {
98+
let _ = zrtt_task.await.anyerr()?;
99+
pong(recv, i).await?;
100+
conn
101+
}
102+
ZeroRttStatus::Rejected(conn) => {
103+
zrtt_task.abort();
104+
let (send, recv) = conn.open_bi().await.anyerr()?;
105+
pingpong(send, recv, i).await?;
106+
conn
107+
}
108+
}
109+
}
110+
Err(connecting) => {
111+
trace!("0-RTT not possible from our side");
112+
let conn = connecting.await.anyerr()?;
113+
let (send, recv) = conn.open_bi().await.anyerr()?;
114+
pingpong(send, recv, i).await?;
115+
conn
116+
}
117+
}
113118
};
114-
tokio::spawn(async move {
115-
// wait for some time for the handshake to complete and the server
116-
// to send a NewSessionTicket. This is less than ideal, but we
117-
// don't have a better way to wait for the handshake to complete.
118-
tokio::time::sleep(connection.rtt() * 2).await;
119-
connection.close(0u8.into(), b"");
120-
});
119+
connection.close(0u8.into(), b"");
121120
let elapsed = t0.elapsed();
122121
println!("round {i}: {} us", elapsed.as_micros());
123122
}
@@ -138,24 +137,13 @@ async fn accept(_args: Args) -> Result<()> {
138137
.relay_mode(iroh::RelayMode::Disabled)
139138
.bind()
140139
.await?;
141-
let mut addrs = endpoint.watch_addr().stream();
142-
let addr = loop {
143-
let Some(addr) = addrs.next().await else {
144-
bail_any!("Address stream closed");
145-
};
146-
if !addr.ip_addrs().count() == 0 {
147-
break addr;
148-
}
149-
};
150-
println!("Listening on: {addr:?}");
140+
println!("endpoint id: {}", endpoint.id());
151141

152142
let accept = async move {
153143
while let Some(incoming) = endpoint.accept().await {
154144
tokio::spawn(async move {
155-
let connecting = incoming.accept().anyerr()?;
156-
let (connection, _zero_rtt_accepted) = connecting
157-
.into_0rtt()
158-
.expect("accept into 0.5 RTT always succeeds");
145+
let accepting = incoming.accept().anyerr()?;
146+
let connection = accepting.into_0rtt();
159147
let (mut send, mut recv) = connection.accept_bi().await.anyerr()?;
160148
trace!("recv.is_0rtt: {}", recv.is_0rtt());
161149
let data = recv.read_to_end(8).await.anyerr()?;

iroh/examples/dht_discovery.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ async fn chat_server(args: Args) -> Result<()> {
7878
println!("pkarr z32: {zid}");
7979
println!("see https://app.pkarr.org/?pk={zid}");
8080
while let Some(incoming) = endpoint.accept().await {
81-
let connecting = match incoming.accept() {
82-
Ok(connecting) => connecting,
81+
let accepting = match incoming.accept() {
82+
Ok(accepting) => accepting,
8383
Err(err) => {
8484
warn!("incoming connection failed: {err:#}");
8585
// we can carry on in these cases:
@@ -88,8 +88,8 @@ async fn chat_server(args: Args) -> Result<()> {
8888
}
8989
};
9090
tokio::spawn(async move {
91-
let connection = connecting.await.anyerr()?;
92-
let remote_endpoint_id = connection.remote_id()?;
91+
let connection = accepting.await?;
92+
let remote_endpoint_id = connection.remote_id();
9393
println!("got connection from {remote_endpoint_id}");
9494
// just leave the tasks hanging. this is just an example.
9595
let (mut writer, mut reader) = connection.accept_bi().await.anyerr()?;

iroh/examples/echo-no-router.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async fn start_accept_side() -> Result<Endpoint> {
8989
let connection = incoming.await.anyerr()?;
9090

9191
// We can get the remote's endpoint id from the connection.
92-
let endpoint_id = connection.remote_id()?;
92+
let endpoint_id = connection.remote_id();
9393
println!("accepted connection from {endpoint_id}");
9494

9595
// Our protocol is a simple request-response protocol, so we expect the

iroh/examples/echo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl ProtocolHandler for Echo {
8686
/// the connection lasts.
8787
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
8888
// We can get the remote's endpoint id from the connection.
89-
let endpoint_id = connection.remote_id()?;
89+
let endpoint_id = connection.remote_id();
9090
println!("accepted connection from {endpoint_id}");
9191

9292
// Our protocol is a simple request-response protocol, so we expect the

iroh/examples/listen-unreliable.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,18 +62,18 @@ async fn main() -> Result<()> {
6262
// accept incoming connections, returns a normal QUIC connection
6363

6464
while let Some(incoming) = endpoint.accept().await {
65-
let mut connecting = match incoming.accept() {
66-
Ok(connecting) => connecting,
65+
let mut accepting = match incoming.accept() {
66+
Ok(accepting) => accepting,
6767
Err(err) => {
6868
warn!("incoming connection failed: {err:#}");
6969
// we can carry on in these cases:
7070
// this can be caused by retransmitted datagrams
7171
continue;
7272
}
7373
};
74-
let alpn = connecting.alpn().await?;
75-
let conn = connecting.await.anyerr()?;
76-
let endpoint_id = conn.remote_id()?;
74+
let alpn = accepting.alpn().await?;
75+
let conn = accepting.await?;
76+
let endpoint_id = conn.remote_id();
7777
info!(
7878
"new (unreliable) connection from {endpoint_id} with ALPN {}",
7979
String::from_utf8_lossy(&alpn),

iroh/examples/listen.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,18 @@ async fn main() -> Result<()> {
6060
);
6161
// accept incoming connections, returns a normal QUIC connection
6262
while let Some(incoming) = endpoint.accept().await {
63-
let mut connecting = match incoming.accept() {
64-
Ok(connecting) => connecting,
63+
let mut accepting = match incoming.accept() {
64+
Ok(accepting) => accepting,
6565
Err(err) => {
6666
warn!("incoming connection failed: {err:#}");
6767
// we can carry on in these cases:
6868
// this can be caused by retransmitted datagrams
6969
continue;
7070
}
7171
};
72-
let alpn = connecting.alpn().await?;
73-
let conn = connecting.await.anyerr()?;
74-
let endpoint_id = conn.remote_id()?;
72+
let alpn = accepting.alpn().await?;
73+
let conn = accepting.await?;
74+
let endpoint_id = conn.remote_id();
7575
info!(
7676
"new connection from {endpoint_id} with ALPN {}",
7777
String::from_utf8_lossy(&alpn),

iroh/examples/screening-connection.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::sync::{
1414

1515
use iroh::{
1616
Endpoint, EndpointAddr,
17-
endpoint::{Connecting, Connection},
17+
endpoint::{Accepting, Connection},
1818
protocol::{AcceptError, ProtocolHandler, Router},
1919
};
2020
use n0_error::{Result, StdResultExt, e};
@@ -100,10 +100,10 @@ struct ScreenedEcho {
100100
}
101101

102102
impl ProtocolHandler for ScreenedEcho {
103-
/// `on_connecting` allows us to intercept a connection as it's being formed,
103+
/// `on_accepting` allows us to intercept a connection as it's being formed,
104104
/// which is the right place to cut off a connection as early as possible.
105105
/// This is an optional method on the ProtocolHandler trait.
106-
async fn on_connecting(&self, connecting: Connecting) -> Result<Connection, AcceptError> {
106+
async fn on_accepting(&self, accepting: Accepting) -> Result<Connection, AcceptError> {
107107
self.conn_attempt_count.fetch_add(1, Ordering::Relaxed);
108108
let count = self.conn_attempt_count.load(Ordering::Relaxed);
109109

@@ -113,8 +113,8 @@ impl ProtocolHandler for ScreenedEcho {
113113
return Err(e!(AcceptError::NotAllowed));
114114
}
115115

116-
// To allow normal connection construction, await the connecting future & return
117-
let conn = connecting.await?;
116+
// To allow normal connection construction, await the accepting future & return
117+
let conn = accepting.await?;
118118
Ok(conn)
119119
}
120120

@@ -125,7 +125,7 @@ impl ProtocolHandler for ScreenedEcho {
125125
/// the connection lasts.
126126
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
127127
// We can get the remote's endpoint id from the connection.
128-
let endpoint_id = connection.remote_id()?;
128+
let endpoint_id = connection.remote_id();
129129
println!("accepted connection from {endpoint_id}");
130130

131131
// Our protocol is a simple request-response protocol, so we expect the

iroh/examples/search.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl ProtocolHandler for BlobSearch {
128128
/// the connection lasts.
129129
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
130130
// We can get the remote's endpoint id from the connection.
131-
let endpoint_id = connection.remote_id()?;
131+
let endpoint_id = connection.remote_id();
132132
println!("accepted connection from {endpoint_id}");
133133

134134
// Our protocol is a simple request-response protocol, so we expect the

0 commit comments

Comments
 (0)