Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4c58778
feat: make `Connection::remote_id` and `Connection::alpn` infallible
Oct 17, 2025
8eaeeb1
maybe add fallible connection struct
Oct 20, 2025
710adbf
refactor: clean up some code
Oct 24, 2025
df1ca8a
refactor: `ZeroRttConnection::into_connection`
Oct 28, 2025
84e59c3
refactor: try `ZeroRttServerConn` and `ZeroRttClientConn` API
Oct 30, 2025
758cd12
chore: codespell and docs
Oct 30, 2025
abf6ae7
refactor: adjustment to converting quinn::Connection to Connection
Oct 30, 2025
3f0defa
docs
Oct 30, 2025
681e879
refactor: move all `Connection`, `Connecting`, etc definitions from `…
Oct 30, 2025
6442fa4
chore: fmt docs clean up
Oct 31, 2025
44e813f
Update iroh/src/endpoint/connection.rs
ramfox Oct 31, 2025
ad879c0
Update iroh/src/endpoint/connection.rs
ramfox Oct 31, 2025
4015bc2
refactor: `conn_from_quinn_conn_dangerous` -> `conn_from_quinn_conn`
Oct 31, 2025
f364864
refactor: remote `Incoming::accept_0rtt` add `ZeroRttConnection` enum…
Oct 31, 2025
826d5ae
Introduce `Accepting` and remove `enum ZeroRttConnection`
matheus23 Oct 31, 2025
738fe6c
Introduce `ConnectingError` and return it for `Connection` conversion…
matheus23 Oct 31, 2025
92a47c5
Fix type errors
matheus23 Oct 31, 2025
f745e62
refactor `ZeroRttServerConnection` to `ZeroRttInConn` and `ZeroRttCli…
Oct 31, 2025
5835b47
refactor: `ZeroRttOutConn` to `OutgoingZeroRttConnection`, `ZeroRttIn…
Nov 1, 2025
5994ce3
refactor: rename `QuinnConnectionError` -> `AuthenticationError`
Nov 3, 2025
5570e1e
Merge branch 'main' into ramfox/infallible-remote-id
ramfox Nov 3, 2025
2c13887
refactor: 0rtt example
Nov 3, 2025
d854f84
test: loop `test_0rtt_after_server_restart`
Nov 3, 2025
307bfa7
Merge remote-tracking branch 'origin/main' into ramfox/infallible-rem…
Frando Nov 3, 2025
75cdf37
fix: fixes after n0-error merge
Frando Nov 3, 2025
529b0d6
fix: connect/accept futs have stack error
Frando Nov 3, 2025
68363a1
fix: remove call to `try_send_rtt_msg` in `Accepting::into_0rtt`, it …
Nov 3, 2025
862fb17
tests: make test fail reliably
Frando Nov 3, 2025
a1b42d4
fixup
Frando Nov 3, 2025
e53f8a7
improve logging spans and error context
Frando Nov 3, 2025
d067554
fix: ensure the underlying quinn connection did not close before conv…
Nov 3, 2025
a3203e0
test: add tracing logs back to `test_0rtt_after_server_restart` test
Nov 3, 2025
b1c42ef
test: remove loop, seems to be failing on first loop anyway
Nov 4, 2025
d3c9600
test: remove relay-only code from server restart test
Nov 4, 2025
742104b
Restructure `spawn_0rtt_server` a bit.
matheus23 Nov 4, 2025
b60b1ec
Adjust doc comments for `handshake_completed` fns
matheus23 Nov 4, 2025
66fb812
Merge branch 'main' into ramfox/infallible-remote-id
dignifiedquire Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions iroh/bench/src/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,16 @@ pub async fn server(endpoint: Endpoint, opt: Opt) -> Result<()> {
// Handle only the expected amount of clients
for _ in 0..opt.clients {
let incoming = endpoint.accept().await.unwrap();
let connecting = match incoming.accept() {
Ok(connecting) => connecting,
let accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
// this can be caused by retransmitted datagrams
continue;
}
};
let connection = connecting.await.context("handshake failed")?;
let connection = accepting.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
Expand Down
6 changes: 3 additions & 3 deletions iroh/bench/src/quinn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,16 @@ pub async fn server(endpoint: Endpoint, opt: Opt) -> Result<()> {
// Handle only the expected amount of clients
for _ in 0..opt.clients {
let incoming = endpoint.accept().await.unwrap();
let connecting = match incoming.accept() {
Ok(connecting) => connecting,
let accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
// this can be caused by retransmitted datagrams
continue;
}
};
let connection = connecting.await.context("handshake failed")?;
let connection = accepting.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
Expand Down
81 changes: 35 additions & 46 deletions iroh/examples/0rtt.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::{env, future::Future, str::FromStr, time::Instant};
use std::{env, str::FromStr, time::Instant};

use clap::Parser;
use data_encoding::HEXLOWER;
use iroh::{
EndpointId, SecretKey,
endpoint::{Connecting, Connection},
};
use n0_future::{StreamExt, future};
use iroh::{EndpointId, SecretKey, endpoint::ZeroRttStatus};
use n0_future::StreamExt;
use n0_snafu::ResultExt;
use n0_watcher::Watcher;
use quinn::{RecvStream, SendStream};
use tracing::{info, trace};

const PINGPONG_ALPN: &[u8] = b"0rtt-pingpong";
Expand Down Expand Up @@ -48,48 +46,15 @@ pub fn get_or_generate_secret_key() -> n0_snafu::Result<SecretKey> {
/// We send the data on the connection. If `proceed` resolves to true,
/// read the response immediately. Otherwise, the stream pair is bad and we need
/// to open a new stream pair.
async fn pingpong(
connection: &Connection,
proceed: impl Future<Output = bool>,
x: u64,
) -> n0_snafu::Result<()> {
let (mut send, recv) = connection.open_bi().await.e()?;
async fn pingpong(mut send: SendStream, mut recv: RecvStream, x: u64) -> n0_snafu::Result<()> {
let data = x.to_be_bytes();
send.write_all(&data).await.e()?;
send.finish().e()?;
let mut recv = if proceed.await {
// use recv directly if we can proceed
recv
} else {
// proceed returned false, so we have learned that the 0-RTT send was rejected.
// at this point we have a fully handshaked connection, so we try again.
let (mut send, recv) = connection.open_bi().await.e()?;
send.write_all(&data).await.e()?;
send.finish().e()?;
recv
};
let echo = recv.read_to_end(8).await.e()?;
assert!(echo == data);
Ok(())
}

async fn pingpong_0rtt(connecting: Connecting, i: u64) -> n0_snafu::Result<Connection> {
let connection = match connecting.into_0rtt() {
Ok((connection, accepted)) => {
trace!("0-RTT possible from our side");
pingpong(&connection, accepted, i).await?;
connection
}
Err(connecting) => {
trace!("0-RTT not possible from our side");
let connection = connecting.await.e()?;
pingpong(&connection, future::ready(true), i).await?;
connection
}
};
Ok(connection)
}

async fn connect(args: Args) -> n0_snafu::Result<()> {
let remote_id = args.endpoint_id.unwrap();
let endpoint = iroh::Endpoint::builder()
Expand All @@ -106,10 +71,36 @@ async fn connect(args: Args) -> n0_snafu::Result<()> {
let connection = if args.disable_0rtt {
let connection = connecting.await.e()?;
trace!("connecting without 0-RTT");
pingpong(&connection, future::ready(true), i).await?;
let (send, recv) = connection.open_bi().await.e()?;
pingpong(send, recv, i).await?;
connection
} else {
pingpong_0rtt(connecting, i).await?
match connecting.into_0rtt() {
Ok(zrtt_connection) => {
trace!("0-RTT possible from our side");
let (send, recv) = zrtt_connection.open_bi().await.e()?;
let zrtt_task = tokio::spawn(pingpong(send, recv, i));
match zrtt_connection.handshake_completed().await? {
ZeroRttStatus::Accepted(conn) => {
let _ = zrtt_task.await.e()?;
conn
}
ZeroRttStatus::Rejected(conn) => {
zrtt_task.abort();
let (send, recv) = conn.open_bi().await.e()?;
pingpong(send, recv, i).await?;
conn
}
}
}
Err(connecting) => {
trace!("0-RTT not possible from our side");
let conn = connecting.await.e()?;
let (send, recv) = conn.open_bi().await.e()?;
pingpong(send, recv, i).await?;
conn
}
}
};
tokio::spawn(async move {
// wait for some time for the handshake to complete and the server
Expand Down Expand Up @@ -152,10 +143,8 @@ async fn accept(_args: Args) -> n0_snafu::Result<()> {
let accept = async move {
while let Some(incoming) = endpoint.accept().await {
tokio::spawn(async move {
let connecting = incoming.accept().e()?;
let (connection, _zero_rtt_accepted) = connecting
.into_0rtt()
.expect("accept into 0.5 RTT always succeeds");
let accepting = incoming.accept().e()?;
let connection = accepting.into_0rtt();
let (mut send, mut recv) = connection.accept_bi().await.e()?;
trace!("recv.is_0rtt: {}", recv.is_0rtt());
let data = recv.read_to_end(8).await.e()?;
Expand Down
8 changes: 4 additions & 4 deletions iroh/examples/dht_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async fn chat_server(args: Args) -> n0_snafu::Result<()> {
println!("pkarr z32: {zid}");
println!("see https://app.pkarr.org/?pk={zid}");
while let Some(incoming) = endpoint.accept().await {
let connecting = match incoming.accept() {
Ok(connecting) => connecting,
let accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
Expand All @@ -88,8 +88,8 @@ async fn chat_server(args: Args) -> n0_snafu::Result<()> {
}
};
tokio::spawn(async move {
let connection = connecting.await.e()?;
let remote_endpoint_id = connection.remote_id()?;
let connection = accepting.await.e()?;
let remote_endpoint_id = connection.remote_id();
println!("got connection from {remote_endpoint_id}");
// just leave the tasks hanging. this is just an example.
let (mut writer, mut reader) = connection.accept_bi().await.e()?;
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/echo-no-router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn start_accept_side() -> Result<Endpoint> {
let connection = incoming.await.e()?;

// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl ProtocolHandler for Echo {
/// the connection lasts.
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
10 changes: 5 additions & 5 deletions iroh/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ async fn main() -> Result<()> {
// accept incoming connections, returns a normal QUIC connection

while let Some(incoming) = endpoint.accept().await {
let mut connecting = match incoming.accept() {
Ok(connecting) => connecting,
let mut accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
// this can be caused by retransmitted datagrams
continue;
}
};
let alpn = connecting.alpn().await?;
let conn = connecting.await.e()?;
let endpoint_id = conn.remote_id()?;
let alpn = accepting.alpn().await?;
let conn = accepting.await.e()?;
let endpoint_id = conn.remote_id();
info!(
"new (unreliable) connection from {endpoint_id} with ALPN {}",
String::from_utf8_lossy(&alpn),
Expand Down
10 changes: 5 additions & 5 deletions iroh/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ async fn main() -> n0_snafu::Result<()> {
);
// accept incoming connections, returns a normal QUIC connection
while let Some(incoming) = endpoint.accept().await {
let mut connecting = match incoming.accept() {
Ok(connecting) => connecting,
let mut accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
// this can be caused by retransmitted datagrams
continue;
}
};
let alpn = connecting.alpn().await?;
let conn = connecting.await.e()?;
let endpoint_id = conn.remote_id()?;
let alpn = accepting.alpn().await?;
let conn = accepting.await.e()?;
let endpoint_id = conn.remote_id();
info!(
"new connection from {endpoint_id} with ALPN {}",
String::from_utf8_lossy(&alpn),
Expand Down
12 changes: 6 additions & 6 deletions iroh/examples/screening-connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::{

use iroh::{
Endpoint, EndpointAddr,
endpoint::{Connecting, Connection},
endpoint::{Accepting, Connection},
protocol::{AcceptError, ProtocolHandler, Router},
};
use n0_snafu::{Result, ResultExt};
Expand Down Expand Up @@ -100,10 +100,10 @@ struct ScreenedEcho {
}

impl ProtocolHandler for ScreenedEcho {
/// `on_connecting` allows us to intercept a connection as it's being formed,
/// `on_accepting` allows us to intercept a connection as it's being formed,
/// which is the right place to cut off a connection as early as possible.
/// This is an optional method on the ProtocolHandler trait.
async fn on_connecting(&self, connecting: Connecting) -> Result<Connection, AcceptError> {
async fn on_accepting(&self, accepting: Accepting) -> Result<Connection, AcceptError> {
self.conn_attempt_count.fetch_add(1, Ordering::Relaxed);
let count = self.conn_attempt_count.load(Ordering::Relaxed);

Expand All @@ -113,8 +113,8 @@ impl ProtocolHandler for ScreenedEcho {
return Err(AcceptError::NotAllowed {});
}

// To allow normal connection construction, await the connecting future & return
let conn = connecting.await?;
// To allow normal connection construction, await the accepting future & return
let conn = accepting.await?;
Ok(conn)
}

Expand All @@ -125,7 +125,7 @@ impl ProtocolHandler for ScreenedEcho {
/// the connection lasts.
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl ProtocolHandler for BlobSearch {
/// the connection lasts.
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
8 changes: 4 additions & 4 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {

// accept incoming connections, returns a normal QUIC connection
while let Some(incoming) = endpoint.accept().await {
let connecting = match incoming.accept() {
Ok(connecting) => connecting,
let accepting = match incoming.accept() {
Ok(accepting) => accepting,
Err(err) => {
warn!("incoming connection failed: {err:#}");
// we can carry on in these cases:
Expand All @@ -339,8 +339,8 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {
// spawn a task to handle reading and writing off of the connection
let endpoint_clone = endpoint.clone();
tokio::spawn(async move {
let conn = connecting.await.e()?;
let endpoint_id = conn.remote_id()?;
let conn = accepting.await.e()?;
let endpoint_id = conn.remote_id();
info!(
"new connection from {endpoint_id} with ALPN {}",
String::from_utf8_lossy(TRANSFER_ALPN),
Expand Down
8 changes: 4 additions & 4 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,9 @@ mod tests {
// Keep connections alive until the task is dropped.
let mut connections = Vec::new();
// we skip accept() errors, they can be caused by retransmits
while let Some(connecting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
while let Some(accepting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
// Just accept incoming connections, but don't do anything with them.
let conn = connecting.await.context("connecting")?;
let conn = accepting.await.context("accepting")?;
connections.push(conn);
}

Expand Down Expand Up @@ -1091,8 +1091,8 @@ mod test_dns_pkarr {
let ep = ep.clone();
async move {
// we skip accept() errors, they can be caused by retransmits
while let Some(connecting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
let _conn = connecting.await.context("connecting")?;
while let Some(accepting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
let _conn = accepting.await.context("accepting")?;
// Just accept incoming connections, but don't do anything with them.
}

Expand Down
Loading
Loading