From 00c7d671631cc521eeb51c139b30c50b4f9ba86a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 22 Nov 2025 09:55:26 +0100 Subject: [PATCH 1/8] feat(iroh): allow IP transports to bind to multiple interfaces --- iroh/src/endpoint.rs | 58 +++++++++--- iroh/src/magicsock/transports.rs | 31 ++++--- iroh/src/magicsock/transports/ip.rs | 139 ++++++++++++++++++++++++---- 3 files changed, 182 insertions(+), 46 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 99d6ab7436..3e6917336b 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -11,16 +11,17 @@ //! //! [module docs]: crate -use std::{ - net::{SocketAddr, SocketAddrV4, SocketAddrV6}, - sync::Arc, -}; +#[cfg(not(wasm_browser))] +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::{net::SocketAddr, sync::Arc}; use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr}; use iroh_relay::{RelayConfig, RelayMap}; use n0_error::{e, ensure, stack_error}; use n0_future::time::Duration; use n0_watcher::Watcher; +#[cfg(not(wasm_browser))] +pub use netdev::ipnet::{Ipv4Net, Ipv6Net}; use tracing::{debug, instrument, trace, warn}; use url::Url; @@ -73,6 +74,8 @@ pub use self::connection::{ IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection, RemoteEndpointIdError, ZeroRttStatus, }; +#[cfg(not(wasm_browser))] +pub use crate::magicsock::transports::IpConfig; pub use crate::magicsock::transports::TransportConfig; /// Builder for [`Endpoint`]. @@ -236,11 +239,24 @@ impl Builder { /// /// Setting the port to `0` will use a random port. /// If the port specified is already in use, it will fallback to choosing a random port. + /// + /// Only a single interface can be the default, so this will replace the existing default #[cfg(not(wasm_browser))] - pub fn bind_addr_v4(mut self, bind_addr: SocketAddrV4) -> Self { - self.transports.push(TransportConfig::Ip { - bind_addr: bind_addr.into(), - }); + pub fn bind_addr_v4_default(mut self, ip_addr: Ipv4Addr, port: u16) -> Self { + self.transports + .retain(|t| !matches!(t, TransportConfig::Ip(IpConfig::V4Default { .. }))); + self.transports + .push(TransportConfig::Ip(IpConfig::V4Default { ip_addr, port })); + self + } + + /// Binds an ipv4 socket + /// + /// If you want to remove the default transports, make sure to call `clear_ip` first. + #[cfg(not(wasm_browser))] + pub fn bind_addr_v4(mut self, ip_addr: Ipv4Net, port: u16) -> Self { + self.transports + .push(TransportConfig::Ip(IpConfig::V4 { ip_addr, port })); self } @@ -250,11 +266,29 @@ impl Builder { /// /// Setting the port to `0` will use a random port. /// If the port specified is already in use, it will fallback to choosing a random port. + /// + /// Only a single interface can be the default, so this will replace the existing default #[cfg(not(wasm_browser))] - pub fn bind_addr_v6(mut self, bind_addr: SocketAddrV6) -> Self { - self.transports.push(TransportConfig::Ip { - bind_addr: bind_addr.into(), - }); + pub fn bind_addr_v6_default(mut self, ip_addr: Ipv6Addr, scope_id: u32, port: u16) -> Self { + self.transports + .push(TransportConfig::Ip(IpConfig::V6Default { + ip_addr, + scope_id, + port, + })); + self + } + + /// Binds an ipv6 socket + /// + /// If you want to remove the default transports, make sure to call `clear_ip` first. + #[cfg(not(wasm_browser))] + pub fn bind_addr_v6(mut self, ip_addr: Ipv6Net, scope_id: u32, port: u16) -> Self { + self.transports.push(TransportConfig::Ip(IpConfig::V6 { + ip_addr, + scope_id, + port, + })); self } diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index 7b95ec8bee..e0a399a090 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -22,6 +22,8 @@ use crate::{metrics::EndpointMetrics, net_report::Report}; mod ip; mod relay; +#[cfg(not(wasm_browser))] +pub use self::ip::Config as IpConfig; #[cfg(not(wasm_browser))] pub(crate) use self::ip::IpTransport; #[cfg(not(wasm_browser))] @@ -68,33 +70,34 @@ pub(crate) type LocalAddrsWatch = n0_watcher::Map< pub enum TransportConfig { /// IP based transport #[cfg(not(wasm_browser))] - Ip { - /// The address this transport will bind on. - bind_addr: SocketAddr, - }, + Ip(ip::Config), /// Relay transport Relay { /// The [`RelayMap`] used for this relay. relay_map: RelayMap, }, } + impl TransportConfig { /// Configures a default IPv4 transport, listening on `0.0.0.0:0`. #[cfg(not(wasm_browser))] pub fn default_ipv4() -> Self { - use std::net::{Ipv4Addr, SocketAddrV4}; + use std::net::Ipv4Addr; - Self::Ip { - bind_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), - } + Self::Ip(ip::Config::V4Default { + ip_addr: Ipv4Addr::UNSPECIFIED, + port: 0, + }) } /// Configures a default IPv6 transport, listening on `[::]:0`. #[cfg(not(wasm_browser))] pub fn default_ipv6() -> Self { - Self::Ip { - bind_addr: SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0)), - } + Self::Ip(ip::Config::V6Default { + ip_addr: Ipv6Addr::UNSPECIFIED, + scope_id: 0, + port: 0, + }) } } @@ -102,13 +105,13 @@ impl TransportConfig { fn bind_ip(configs: &[TransportConfig], metrics: &EndpointMetrics) -> io::Result> { let mut transports = Vec::new(); for config in configs { - if let TransportConfig::Ip { bind_addr } = config { - match IpTransport::bind(*bind_addr, metrics.magicsock.clone()) { + if let TransportConfig::Ip(config) = config { + match IpTransport::bind(*config, metrics.magicsock.clone()) { Ok(transport) => { transports.push(transport); } Err(err) => { - if bind_addr.is_ipv6() { + if config.is_ipv6() { tracing::info!("bind ignoring IPv6 bind failure: {:?}", err); } else { return Err(err); diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index dc4ab293da..745cab5a5d 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -1,12 +1,14 @@ use std::{ io, - net::{IpAddr, SocketAddr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, pin::Pin, sync::Arc, task::{Context, Poll}, }; use n0_watcher::Watchable; +#[cfg(not(wasm_browser))] +use netdev::ipnet::{Ipv4Net, Ipv6Net}; use netwatch::{UdpSender, UdpSocket}; use pin_project::pin_project; use tracing::{debug, trace}; @@ -16,15 +18,116 @@ use crate::metrics::MagicsockMetrics; #[derive(Debug)] pub(crate) struct IpTransport { - bind_addr: SocketAddr, + config: Config, socket: Arc, local_addr: Watchable, metrics: Arc, } -fn bind_with_fallback(mut addr: SocketAddr) -> io::Result { - debug!(%addr, "binding"); +/// IP transport configuration +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Config { + /// Default IPv4 binding + V4Default { + /// TODO + ip_addr: Ipv4Addr, + /// The port to bind on + port: u16, + }, + /// Default IPv6 binding + V6Default { + /// TODO + ip_addr: Ipv6Addr, + /// The scope_id + scope_id: u32, + /// The port to bind on + port: u16, + }, + /// IPv4 binding + V4 { + /// TODO + ip_addr: Ipv4Net, + /// The port to bind on + port: u16, + }, + /// IPv6 binding + V6 { + /// TODO + ip_addr: Ipv6Net, + /// The scope id + scope_id: u32, + /// The port to bind on + port: u16, + }, +} + +impl Config { + /// Is this a v4 config. + pub fn is_ipv4(&self) -> bool { + matches!(self, Self::V4Default { .. } | Self::V4 { .. }) + } + + /// Is this a v6 config. + pub fn is_ipv6(&self) -> bool { + matches!(self, Self::V6Default { .. } | Self::V6 { .. }) + } + /// Is this a default config? + pub fn is_default(&self) -> bool { + matches!(self, Self::V4Default { .. } | Self::V6Default { .. }) + } + + /// TODO + pub fn is_valid_send_addr(&self, dst: SocketAddr) -> bool { + match self { + Self::V4Default { .. } => matches!(dst, SocketAddr::V4(_)), + Self::V6Default { .. } => matches!(dst, SocketAddr::V6(_)), + Self::V4 { ip_addr, .. } => match dst { + SocketAddr::V4(dst_v4) => ip_addr.contains(dst_v4.ip()), + SocketAddr::V6(_) => false, + }, + Self::V6 { + ip_addr, scope_id, .. + } => match dst { + SocketAddr::V6(dst_v6) => { + if ip_addr.contains(dst_v6.ip()) { + return true; + } + if dst_v6.ip().is_unicast_link_local() { + // If we have a link local interface, use the scope id + if *scope_id == dst_v6.scope_id() { + return true; + } + } + false + } + SocketAddr::V4(_) => false, + }, + } + } +} + +impl From for SocketAddr { + fn from(value: Config) -> Self { + match value { + Config::V4Default { ip_addr, port } => SocketAddr::V4(SocketAddrV4::new(ip_addr, port)), + Config::V6Default { + ip_addr, + scope_id, + port, + } => SocketAddr::V6(SocketAddrV6::new(ip_addr, port, 0, scope_id)), + Config::V4 { ip_addr, port } => SocketAddr::V4(SocketAddrV4::new(ip_addr.addr(), port)), + Config::V6 { + ip_addr, + scope_id, + port, + } => SocketAddr::V6(SocketAddrV6::new(ip_addr.addr(), port, 0, scope_id)), + } + } +} + +fn bind_with_fallback(mut addr: SocketAddr) -> io::Result { + debug!(?addr, "binding"); // First try binding a preferred port, if specified match netwatch::UdpSocket::bind_full(addr) { Ok(socket) => { @@ -47,13 +150,13 @@ fn bind_with_fallback(mut addr: SocketAddr) -> io::Result { } impl IpTransport { - pub(crate) fn bind(bind_addr: SocketAddr, metrics: Arc) -> io::Result { - let socket = bind_with_fallback(bind_addr)?; - Ok(Self::new(bind_addr, Arc::new(socket), metrics.clone())) + pub(crate) fn bind(config: Config, metrics: Arc) -> io::Result { + let socket = bind_with_fallback(config.into())?; + Ok(Self::new(config, Arc::new(socket), metrics.clone())) } pub(crate) fn new( - bind_addr: SocketAddr, + config: Config, socket: Arc, metrics: Arc, ) -> Self { @@ -62,7 +165,7 @@ impl IpTransport { let local_addr = Watchable::new(socket.local_addr().expect("invalid socket")); Self { - bind_addr, + config, socket, local_addr, metrics, @@ -119,7 +222,7 @@ impl IpTransport { } pub(crate) fn bind_addr(&self) -> SocketAddr { - self.bind_addr + self.config.into() } pub(super) fn create_network_change_sender(&self) -> IpNetworkChangeSender { @@ -132,7 +235,7 @@ impl IpTransport { pub(super) fn create_sender(&self) -> IpSender { let sender = self.socket.clone().create_sender(); IpSender { - bind_addr: self.bind_addr, + config: self.config, sender, metrics: self.metrics.clone(), } @@ -164,7 +267,7 @@ impl IpNetworkChangeSender { #[derive(Debug, Clone)] #[pin_project] pub(super) struct IpSender { - bind_addr: SocketAddr, + config: Config, #[pin] sender: UdpSender, metrics: Arc, @@ -173,16 +276,12 @@ pub(super) struct IpSender { impl IpSender { pub(super) fn is_valid_send_addr(&self, dst: &SocketAddr) -> bool { // Our net-tools crate binds sockets to their specific family. This means an IPv6 - // socket can not sent to IPv4, on any platform. So we need to convert and + // socket can not sent to IPv4, on any platform. So we need to convert an // IPv4-mapped IPv6 address back to it's canonical IPv4 address. - let dst_ip = dst.ip().to_canonical(); + let mut dst = *dst; + dst.set_ip(dst.ip().to_canonical()); - #[allow(clippy::match_like_matches_macro)] - match (self.bind_addr.ip(), dst_ip) { - (IpAddr::V4(_), IpAddr::V4(_)) => true, - (IpAddr::V6(_), IpAddr::V6(_)) => true, - _ => false, - } + self.config.is_valid_send_addr(dst) } /// Creates a canonical socket address. From c0cfa7b2c286644b06739e7a7f9e75476c5221c2 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 22 Nov 2025 10:27:26 +0100 Subject: [PATCH 2/8] fix: restructure ip transports --- iroh/src/magicsock/transports.rs | 179 +++++++++++++++++++++++++++---- 1 file changed, 156 insertions(+), 23 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index e0a399a090..7215807efe 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -34,8 +34,16 @@ pub(crate) use self::relay::{RelayActorConfig, RelayTransport}; /// can support. #[derive(Debug)] pub(crate) struct Transports { + /// Default sender for v4 #[cfg(not(wasm_browser))] - ip: Vec, + ip_v4_default: Option, + #[cfg(not(wasm_browser))] + ip_v4: Vec, + /// Default sender for v6 + #[cfg(not(wasm_browser))] + ip_v6_default: Option, + #[cfg(not(wasm_browser))] + ip_v6: Vec, relay: Vec, poll_recv_counter: usize, @@ -101,14 +109,50 @@ impl TransportConfig { } } +/// Returns (default_ipv4, ipv4, default_ipv6, ipv6) transports. #[cfg(not(wasm_browser))] -fn bind_ip(configs: &[TransportConfig], metrics: &EndpointMetrics) -> io::Result> { - let mut transports = Vec::new(); +#[allow(clippy::type_complexity)] +fn bind_ip( + configs: &[TransportConfig], + metrics: &EndpointMetrics, +) -> io::Result<( + Option, + Vec, + Option, + Vec, +)> { + let mut ip_v4_default = None; + let mut ip_v4 = Vec::new(); + let mut ip_v6_default = None; + let mut ip_v6 = Vec::new(); + for config in configs { if let TransportConfig::Ip(config) = config { match IpTransport::bind(*config, metrics.magicsock.clone()) { Ok(transport) => { - transports.push(transport); + if config.is_ipv4() { + if config.is_default() { + if ip_v4_default.is_some() { + return Err(io::Error::other( + "can only have a single IPv4 default transport", + )); + } + ip_v4_default = Some(transport); + } else { + ip_v4.push(transport); + } + } else if config.is_ipv6() { + if config.is_default() { + if ip_v6_default.is_some() { + return Err(io::Error::other( + "can only have a single IPv6 default transport", + )); + } + ip_v6_default = Some(transport); + } else { + ip_v6.push(transport); + } + } } Err(err) => { if config.is_ipv6() { @@ -121,7 +165,7 @@ fn bind_ip(configs: &[TransportConfig], metrics: &EndpointMetrics) -> io::Result } } - Ok(transports) + Ok((ip_v4_default, ip_v4, ip_v6_default, ip_v6)) } impl Transports { @@ -133,7 +177,7 @@ impl Transports { shutdown_token: CancellationToken, ) -> io::Result { #[cfg(not(wasm_browser))] - let ip = bind_ip(configs, metrics)?; + let (ip_v4_default, ip_v4, ip_v6_default, ip_v6) = bind_ip(configs, metrics)?; let relay = configs .iter() @@ -143,7 +187,13 @@ impl Transports { Ok(Self { #[cfg(not(wasm_browser))] - ip, + ip_v4_default, + #[cfg(not(wasm_browser))] + ip_v4, + #[cfg(not(wasm_browser))] + ip_v6_default, + #[cfg(not(wasm_browser))] + ip_v6, relay, poll_recv_counter: Default::default(), source_addrs: Default::default(), @@ -198,7 +248,19 @@ impl Transports { if counter % 2 == 0 { #[cfg(not(wasm_browser))] - for transport in &mut self.ip { + if let Some(ref mut transport) = self.ip_v4_default { + poll_transport!(transport); + } + #[cfg(not(wasm_browser))] + for transport in &mut self.ip_v4 { + poll_transport!(transport); + } + #[cfg(not(wasm_browser))] + if let Some(ref mut transport) = self.ip_v6_default { + poll_transport!(transport); + } + #[cfg(not(wasm_browser))] + for transport in &mut self.ip_v6 { poll_transport!(transport); } for transport in &mut self.relay { @@ -209,7 +271,19 @@ impl Transports { poll_transport!(transport); } #[cfg(not(wasm_browser))] - for transport in self.ip.iter_mut().rev() { + if let Some(ref mut transport) = self.ip_v6_default { + poll_transport!(transport); + } + #[cfg(not(wasm_browser))] + for transport in &mut self.ip_v6 { + poll_transport!(transport); + } + #[cfg(not(wasm_browser))] + if let Some(ref mut transport) = self.ip_v4_default { + poll_transport!(transport); + } + #[cfg(not(wasm_browser))] + for transport in &mut self.ip_v4 { poll_transport!(transport); } } @@ -225,10 +299,18 @@ impl Transports { self.local_addrs_watch().get() } + fn ip_transports(&self) -> impl Iterator { + self.ip_v4_default + .iter() + .chain(self.ip_v4.iter()) + .chain(self.ip_v6_default.iter()) + .chain(self.ip_v6.iter()) + } + /// Watch for all currently known local addresses. #[cfg(not(wasm_browser))] pub(crate) fn local_addrs_watch(&self) -> LocalAddrsWatch { - let ips = n0_watcher::Join::new(self.ip.iter().map(|t| t.local_addr_watch())); + let ips = n0_watcher::Join::new(self.ip_transports().map(|t| t.local_addr_watch())); let relays = n0_watcher::Join::new(self.relay.iter().map(|t| t.local_addr_watch())); ips.or(relays).map(|(ips, relays)| { @@ -254,12 +336,15 @@ impl Transports { /// Returns the bound addresses for IP based transports #[cfg(not(wasm_browser))] pub(crate) fn ip_bind_addrs(&self) -> Vec { - self.ip.iter().map(|t| t.bind_addr()).collect() + self.ip_transports().map(|t| t.bind_addr()).collect() } #[cfg(not(wasm_browser))] pub(crate) fn max_transmit_segments(&self) -> usize { - let res = self.ip.iter().map(|t| t.max_transmit_segments()).min(); + let res = self + .ip_transports() + .map(|t| t.max_transmit_segments()) + .min(); res.unwrap_or(1) } @@ -277,7 +362,7 @@ impl Transports { // but we never get data from both sockets at the same time in `poll_recv` // and it's impossible and unnecessary to be refactored that way. - let res = self.ip.iter().map(|t| t.max_receive_segments()).max(); + let res = self.ip_transports().map(|t| t.max_receive_segments()).max(); res.unwrap_or(1) } @@ -288,7 +373,7 @@ impl Transports { #[cfg(not(wasm_browser))] pub(crate) fn may_fragment(&self) -> bool { - self.ip.iter().any(|t| t.may_fragment()) + self.ip_transports().any(|t| t.may_fragment()) } #[cfg(wasm_browser)] @@ -298,13 +383,26 @@ impl Transports { pub(crate) fn create_sender(&self) -> TransportsSender { #[cfg(not(wasm_browser))] - let ip = self.ip.iter().map(|t| t.create_sender()).collect(); + let ip_v4_default = self.ip_v4_default.as_ref().map(|t| t.create_sender()); + #[cfg(not(wasm_browser))] + let ip_v4 = self.ip_v4.iter().map(|t| t.create_sender()).collect(); + #[cfg(not(wasm_browser))] + let ip_v6_default = self.ip_v6_default.as_ref().map(|t| t.create_sender()); + #[cfg(not(wasm_browser))] + let ip_v6 = self.ip_v6.iter().map(|t| t.create_sender()).collect(); + let relay = self.relay.iter().map(|t| t.create_sender()).collect(); let max_transmit_segments = self.max_transmit_segments(); TransportsSender { #[cfg(not(wasm_browser))] - ip, + ip_v4_default, + #[cfg(not(wasm_browser))] + ip_v4, + #[cfg(not(wasm_browser))] + ip_v6_default, + #[cfg(not(wasm_browser))] + ip_v6, relay, max_transmit_segments, } @@ -315,8 +413,7 @@ impl Transports { NetworkChangeSender { #[cfg(not(wasm_browser))] ip: self - .ip - .iter() + .ip_transports() .map(|t| t.create_network_change_sender()) .collect(), relay: self @@ -489,13 +586,29 @@ impl Addr { /// A sender that sends to all our transports. #[derive(Debug, Clone)] pub(crate) struct TransportsSender { + /// Default sender for v4 + #[cfg(not(wasm_browser))] + ip_v4_default: Option, + #[cfg(not(wasm_browser))] + ip_v4: Vec, + /// Default sender for v6 #[cfg(not(wasm_browser))] - ip: Vec, + ip_v6_default: Option, + #[cfg(not(wasm_browser))] + ip_v6: Vec, relay: Vec, max_transmit_segments: usize, } impl TransportsSender { + fn ip_v4_senders_mut(&mut self) -> impl Iterator { + self.ip_v4.iter_mut().chain(self.ip_v4_default.iter_mut()) + } + + fn ip_v6_senders_mut(&mut self) -> impl Iterator { + self.ip_v6.iter_mut().chain(self.ip_v6_default.iter_mut()) + } + #[instrument(name = "poll_send", skip(self, cx, transmit), fields(len = transmit.contents.len()))] pub(crate) fn poll_send( mut self: Pin<&mut Self>, @@ -510,9 +623,12 @@ impl TransportsSender { return Poll::Ready(Err(io::Error::other("IP is unsupported in browser"))); } #[cfg(not(wasm_browser))] - Addr::Ip(addr) => { - for sender in &mut self.ip { - if sender.is_valid_send_addr(addr) { + Addr::Ip(addr) => match addr { + SocketAddr::V4(_) => { + for sender in self + .ip_v4_senders_mut() + .filter(|s| s.is_valid_send_addr(addr)) + { match Pin::new(sender).poll_send(cx, *addr, src, transmit) { Poll::Pending => {} Poll::Ready(res) => { @@ -525,7 +641,24 @@ impl TransportsSender { } } } - } + SocketAddr::V6(_) => { + for sender in self + .ip_v6_senders_mut() + .filter(|s| s.is_valid_send_addr(addr)) + { + match Pin::new(sender).poll_send(cx, *addr, src, transmit) { + Poll::Pending => {} + Poll::Ready(res) => { + match &res { + Ok(()) => trace!("sent"), + Err(err) => trace!("send failed: {err:#}"), + } + return Poll::Ready(res); + } + } + } + } + }, Addr::Relay(url, endpoint_id) => { for sender in &mut self.relay { if sender.is_valid_send_addr(url, endpoint_id) { From 6e4c7850f28fa8efdead929d09fb62b4111ba0ad Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 22 Nov 2025 10:51:39 +0100 Subject: [PATCH 3/8] cleanup structure --- iroh/src/magicsock/transports.rs | 242 +++++++++++++++---------------- 1 file changed, 117 insertions(+), 125 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index 7215807efe..844da36fe1 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -34,16 +34,8 @@ pub(crate) use self::relay::{RelayActorConfig, RelayTransport}; /// can support. #[derive(Debug)] pub(crate) struct Transports { - /// Default sender for v4 - #[cfg(not(wasm_browser))] - ip_v4_default: Option, - #[cfg(not(wasm_browser))] - ip_v4: Vec, - /// Default sender for v6 #[cfg(not(wasm_browser))] - ip_v6_default: Option, - #[cfg(not(wasm_browser))] - ip_v6: Vec, + ip: IpTransports, relay: Vec, poll_recv_counter: usize, @@ -109,63 +101,91 @@ impl TransportConfig { } } -/// Returns (default_ipv4, ipv4, default_ipv6, ipv6) transports. #[cfg(not(wasm_browser))] -#[allow(clippy::type_complexity)] -fn bind_ip( - configs: &[TransportConfig], - metrics: &EndpointMetrics, -) -> io::Result<( - Option, - Vec, - Option, - Vec, -)> { - let mut ip_v4_default = None; - let mut ip_v4 = Vec::new(); - let mut ip_v6_default = None; - let mut ip_v6 = Vec::new(); - - for config in configs { - if let TransportConfig::Ip(config) = config { - match IpTransport::bind(*config, metrics.magicsock.clone()) { - Ok(transport) => { - if config.is_ipv4() { - if config.is_default() { - if ip_v4_default.is_some() { - return Err(io::Error::other( - "can only have a single IPv4 default transport", - )); +#[derive(Debug)] +struct IpTransports { + v4_default: Option, + v4: Vec, + v6_default: Option, + v6: Vec, +} + +#[cfg(not(wasm_browser))] +impl IpTransports { + fn create_sender(&self) -> IpTransportsSender { + let ip_v4_default = self.v4_default.as_ref().map(|t| t.create_sender()); + let ip_v4 = self.v4.iter().map(|t| t.create_sender()).collect(); + let ip_v6_default = self.v6_default.as_ref().map(|t| t.create_sender()); + let ip_v6 = self.v6.iter().map(|t| t.create_sender()).collect(); + + IpTransportsSender { + v4_default: ip_v4_default, + v4: ip_v4, + v6_default: ip_v6_default, + v6: ip_v6, + } + } + + fn iter(&self) -> impl Iterator { + self.v4_default + .iter() + .chain(self.v4.iter()) + .chain(self.v6_default.iter()) + .chain(self.v6.iter()) + } + + fn bind(configs: &[TransportConfig], metrics: &EndpointMetrics) -> io::Result { + let mut ip_v4_default = None; + let mut ip_v4 = Vec::new(); + let mut ip_v6_default = None; + let mut ip_v6 = Vec::new(); + + for config in configs { + if let TransportConfig::Ip(config) = config { + match IpTransport::bind(*config, metrics.magicsock.clone()) { + Ok(transport) => { + if config.is_ipv4() { + if config.is_default() { + if ip_v4_default.is_some() { + return Err(io::Error::other( + "can only have a single IPv4 default transport", + )); + } + ip_v4_default = Some(transport); + } else { + ip_v4.push(transport); } - ip_v4_default = Some(transport); - } else { - ip_v4.push(transport); - } - } else if config.is_ipv6() { - if config.is_default() { - if ip_v6_default.is_some() { - return Err(io::Error::other( - "can only have a single IPv6 default transport", - )); + } else if config.is_ipv6() { + if config.is_default() { + if ip_v6_default.is_some() { + return Err(io::Error::other( + "can only have a single IPv6 default transport", + )); + } + ip_v6_default = Some(transport); + } else { + ip_v6.push(transport); } - ip_v6_default = Some(transport); - } else { - ip_v6.push(transport); } } - } - Err(err) => { - if config.is_ipv6() { - tracing::info!("bind ignoring IPv6 bind failure: {:?}", err); - } else { - return Err(err); + Err(err) => { + if config.is_ipv6() { + tracing::info!("bind ignoring IPv6 bind failure: {:?}", err); + } else { + return Err(err); + } } } } } - } - Ok((ip_v4_default, ip_v4, ip_v6_default, ip_v6)) + Ok(Self { + v4_default: ip_v4_default, + v4: ip_v4, + v6_default: ip_v6_default, + v6: ip_v6, + }) + } } impl Transports { @@ -177,7 +197,7 @@ impl Transports { shutdown_token: CancellationToken, ) -> io::Result { #[cfg(not(wasm_browser))] - let (ip_v4_default, ip_v4, ip_v6_default, ip_v6) = bind_ip(configs, metrics)?; + let ip = IpTransports::bind(configs, metrics)?; let relay = configs .iter() @@ -187,13 +207,7 @@ impl Transports { Ok(Self { #[cfg(not(wasm_browser))] - ip_v4_default, - #[cfg(not(wasm_browser))] - ip_v4, - #[cfg(not(wasm_browser))] - ip_v6_default, - #[cfg(not(wasm_browser))] - ip_v6, + ip, relay, poll_recv_counter: Default::default(), source_addrs: Default::default(), @@ -248,19 +262,19 @@ impl Transports { if counter % 2 == 0 { #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip_v4_default { + if let Some(ref mut transport) = self.ip.v4_default { poll_transport!(transport); } #[cfg(not(wasm_browser))] - for transport in &mut self.ip_v4 { + for transport in &mut self.ip.v4 { poll_transport!(transport); } #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip_v6_default { + if let Some(ref mut transport) = self.ip.v6_default { poll_transport!(transport); } #[cfg(not(wasm_browser))] - for transport in &mut self.ip_v6 { + for transport in &mut self.ip.v6 { poll_transport!(transport); } for transport in &mut self.relay { @@ -271,19 +285,19 @@ impl Transports { poll_transport!(transport); } #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip_v6_default { + if let Some(ref mut transport) = self.ip.v6_default { poll_transport!(transport); } #[cfg(not(wasm_browser))] - for transport in &mut self.ip_v6 { + for transport in &mut self.ip.v6 { poll_transport!(transport); } #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip_v4_default { + if let Some(ref mut transport) = self.ip.v4_default { poll_transport!(transport); } #[cfg(not(wasm_browser))] - for transport in &mut self.ip_v4 { + for transport in &mut self.ip.v4 { poll_transport!(transport); } } @@ -299,18 +313,10 @@ impl Transports { self.local_addrs_watch().get() } - fn ip_transports(&self) -> impl Iterator { - self.ip_v4_default - .iter() - .chain(self.ip_v4.iter()) - .chain(self.ip_v6_default.iter()) - .chain(self.ip_v6.iter()) - } - /// Watch for all currently known local addresses. #[cfg(not(wasm_browser))] pub(crate) fn local_addrs_watch(&self) -> LocalAddrsWatch { - let ips = n0_watcher::Join::new(self.ip_transports().map(|t| t.local_addr_watch())); + let ips = n0_watcher::Join::new(self.ip.iter().map(|t| t.local_addr_watch())); let relays = n0_watcher::Join::new(self.relay.iter().map(|t| t.local_addr_watch())); ips.or(relays).map(|(ips, relays)| { @@ -336,15 +342,12 @@ impl Transports { /// Returns the bound addresses for IP based transports #[cfg(not(wasm_browser))] pub(crate) fn ip_bind_addrs(&self) -> Vec { - self.ip_transports().map(|t| t.bind_addr()).collect() + self.ip.iter().map(|t| t.bind_addr()).collect() } #[cfg(not(wasm_browser))] pub(crate) fn max_transmit_segments(&self) -> usize { - let res = self - .ip_transports() - .map(|t| t.max_transmit_segments()) - .min(); + let res = self.ip.iter().map(|t| t.max_transmit_segments()).min(); res.unwrap_or(1) } @@ -362,7 +365,7 @@ impl Transports { // but we never get data from both sockets at the same time in `poll_recv` // and it's impossible and unnecessary to be refactored that way. - let res = self.ip_transports().map(|t| t.max_receive_segments()).max(); + let res = self.ip.iter().map(|t| t.max_receive_segments()).max(); res.unwrap_or(1) } @@ -373,7 +376,7 @@ impl Transports { #[cfg(not(wasm_browser))] pub(crate) fn may_fragment(&self) -> bool { - self.ip_transports().any(|t| t.may_fragment()) + self.ip.iter().any(|t| t.may_fragment()) } #[cfg(wasm_browser)] @@ -383,26 +386,14 @@ impl Transports { pub(crate) fn create_sender(&self) -> TransportsSender { #[cfg(not(wasm_browser))] - let ip_v4_default = self.ip_v4_default.as_ref().map(|t| t.create_sender()); - #[cfg(not(wasm_browser))] - let ip_v4 = self.ip_v4.iter().map(|t| t.create_sender()).collect(); - #[cfg(not(wasm_browser))] - let ip_v6_default = self.ip_v6_default.as_ref().map(|t| t.create_sender()); - #[cfg(not(wasm_browser))] - let ip_v6 = self.ip_v6.iter().map(|t| t.create_sender()).collect(); + let ip = self.ip.create_sender(); let relay = self.relay.iter().map(|t| t.create_sender()).collect(); let max_transmit_segments = self.max_transmit_segments(); TransportsSender { #[cfg(not(wasm_browser))] - ip_v4_default, - #[cfg(not(wasm_browser))] - ip_v4, - #[cfg(not(wasm_browser))] - ip_v6_default, - #[cfg(not(wasm_browser))] - ip_v6, + ip, relay, max_transmit_segments, } @@ -413,7 +404,8 @@ impl Transports { NetworkChangeSender { #[cfg(not(wasm_browser))] ip: self - .ip_transports() + .ip + .iter() .map(|t| t.create_network_change_sender()) .collect(), relay: self @@ -586,29 +578,35 @@ impl Addr { /// A sender that sends to all our transports. #[derive(Debug, Clone)] pub(crate) struct TransportsSender { - /// Default sender for v4 - #[cfg(not(wasm_browser))] - ip_v4_default: Option, #[cfg(not(wasm_browser))] - ip_v4: Vec, - /// Default sender for v6 - #[cfg(not(wasm_browser))] - ip_v6_default: Option, - #[cfg(not(wasm_browser))] - ip_v6: Vec, + ip: IpTransportsSender, relay: Vec, max_transmit_segments: usize, } -impl TransportsSender { - fn ip_v4_senders_mut(&mut self) -> impl Iterator { - self.ip_v4.iter_mut().chain(self.ip_v4_default.iter_mut()) +#[cfg(not(wasm_browser))] +#[derive(Debug, Clone)] +struct IpTransportsSender { + /// Default sender for v4 + v4_default: Option, + v4: Vec, + /// Default sender for v6 + v6_default: Option, + v6: Vec, +} + +#[cfg(not(wasm_browser))] +impl IpTransportsSender { + fn v4_iter_mut(&mut self) -> impl Iterator { + self.v4.iter_mut().chain(self.v4_default.iter_mut()) } - fn ip_v6_senders_mut(&mut self) -> impl Iterator { - self.ip_v6.iter_mut().chain(self.ip_v6_default.iter_mut()) + fn v6_iter_mut(&mut self) -> impl Iterator { + self.v6.iter_mut().chain(self.v6_default.iter_mut()) } +} +impl TransportsSender { #[instrument(name = "poll_send", skip(self, cx, transmit), fields(len = transmit.contents.len()))] pub(crate) fn poll_send( mut self: Pin<&mut Self>, @@ -625,10 +623,7 @@ impl TransportsSender { #[cfg(not(wasm_browser))] Addr::Ip(addr) => match addr { SocketAddr::V4(_) => { - for sender in self - .ip_v4_senders_mut() - .filter(|s| s.is_valid_send_addr(addr)) - { + for sender in self.ip.v4_iter_mut().filter(|s| s.is_valid_send_addr(addr)) { match Pin::new(sender).poll_send(cx, *addr, src, transmit) { Poll::Pending => {} Poll::Ready(res) => { @@ -642,10 +637,7 @@ impl TransportsSender { } } SocketAddr::V6(_) => { - for sender in self - .ip_v6_senders_mut() - .filter(|s| s.is_valid_send_addr(addr)) - { + for sender in self.ip.v6_iter_mut().filter(|s| s.is_valid_send_addr(addr)) { match Pin::new(sender).poll_send(cx, *addr, src, transmit) { Poll::Pending => {} Poll::Ready(res) => { From 3978f6b634178b8e23ccc338788472f19de9f9c0 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 22 Nov 2025 11:01:15 +0100 Subject: [PATCH 4/8] cleanup code some more --- iroh/src/magicsock/transports.rs | 157 +++------------------------- iroh/src/magicsock/transports/ip.rs | 153 ++++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 145 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index 844da36fe1..030d5fbd55 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -25,9 +25,7 @@ mod relay; #[cfg(not(wasm_browser))] pub use self::ip::Config as IpConfig; #[cfg(not(wasm_browser))] -pub(crate) use self::ip::IpTransport; -#[cfg(not(wasm_browser))] -use self::ip::{IpNetworkChangeSender, IpSender}; +use self::ip::{IpNetworkChangeSender, IpTransports, IpTransportsSender}; pub(crate) use self::relay::{RelayActorConfig, RelayTransport}; /// Manages the different underlying data transports that the magicsock @@ -101,93 +99,6 @@ impl TransportConfig { } } -#[cfg(not(wasm_browser))] -#[derive(Debug)] -struct IpTransports { - v4_default: Option, - v4: Vec, - v6_default: Option, - v6: Vec, -} - -#[cfg(not(wasm_browser))] -impl IpTransports { - fn create_sender(&self) -> IpTransportsSender { - let ip_v4_default = self.v4_default.as_ref().map(|t| t.create_sender()); - let ip_v4 = self.v4.iter().map(|t| t.create_sender()).collect(); - let ip_v6_default = self.v6_default.as_ref().map(|t| t.create_sender()); - let ip_v6 = self.v6.iter().map(|t| t.create_sender()).collect(); - - IpTransportsSender { - v4_default: ip_v4_default, - v4: ip_v4, - v6_default: ip_v6_default, - v6: ip_v6, - } - } - - fn iter(&self) -> impl Iterator { - self.v4_default - .iter() - .chain(self.v4.iter()) - .chain(self.v6_default.iter()) - .chain(self.v6.iter()) - } - - fn bind(configs: &[TransportConfig], metrics: &EndpointMetrics) -> io::Result { - let mut ip_v4_default = None; - let mut ip_v4 = Vec::new(); - let mut ip_v6_default = None; - let mut ip_v6 = Vec::new(); - - for config in configs { - if let TransportConfig::Ip(config) = config { - match IpTransport::bind(*config, metrics.magicsock.clone()) { - Ok(transport) => { - if config.is_ipv4() { - if config.is_default() { - if ip_v4_default.is_some() { - return Err(io::Error::other( - "can only have a single IPv4 default transport", - )); - } - ip_v4_default = Some(transport); - } else { - ip_v4.push(transport); - } - } else if config.is_ipv6() { - if config.is_default() { - if ip_v6_default.is_some() { - return Err(io::Error::other( - "can only have a single IPv6 default transport", - )); - } - ip_v6_default = Some(transport); - } else { - ip_v6.push(transport); - } - } - } - Err(err) => { - if config.is_ipv6() { - tracing::info!("bind ignoring IPv6 bind failure: {:?}", err); - } else { - return Err(err); - } - } - } - } - } - - Ok(Self { - v4_default: ip_v4_default, - v4: ip_v4, - v6_default: ip_v6_default, - v6: ip_v6, - }) - } -} - impl Transports { /// Binds the transports. pub(crate) fn bind( @@ -197,7 +108,16 @@ impl Transports { shutdown_token: CancellationToken, ) -> io::Result { #[cfg(not(wasm_browser))] - let ip = IpTransports::bind(configs, metrics)?; + let ip = IpTransports::bind( + configs.iter().filter_map(|c| { + if let TransportConfig::Ip(config) = c { + Some(*config) + } else { + None + } + }), + metrics, + )?; let relay = configs .iter() @@ -262,21 +182,8 @@ impl Transports { if counter % 2 == 0 { #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip.v4_default { - poll_transport!(transport); - } - #[cfg(not(wasm_browser))] - for transport in &mut self.ip.v4 { - poll_transport!(transport); - } - #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip.v6_default { - poll_transport!(transport); - } - #[cfg(not(wasm_browser))] - for transport in &mut self.ip.v6 { - poll_transport!(transport); - } + poll_transport!(&mut self.ip); + for transport in &mut self.relay { poll_transport!(transport); } @@ -285,21 +192,7 @@ impl Transports { poll_transport!(transport); } #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip.v6_default { - poll_transport!(transport); - } - #[cfg(not(wasm_browser))] - for transport in &mut self.ip.v6 { - poll_transport!(transport); - } - #[cfg(not(wasm_browser))] - if let Some(ref mut transport) = self.ip.v4_default { - poll_transport!(transport); - } - #[cfg(not(wasm_browser))] - for transport in &mut self.ip.v4 { - poll_transport!(transport); - } + poll_transport!(&mut self.ip); } Poll::Pending @@ -584,28 +477,6 @@ pub(crate) struct TransportsSender { max_transmit_segments: usize, } -#[cfg(not(wasm_browser))] -#[derive(Debug, Clone)] -struct IpTransportsSender { - /// Default sender for v4 - v4_default: Option, - v4: Vec, - /// Default sender for v6 - v6_default: Option, - v6: Vec, -} - -#[cfg(not(wasm_browser))] -impl IpTransportsSender { - fn v4_iter_mut(&mut self) -> impl Iterator { - self.v4.iter_mut().chain(self.v4_default.iter_mut()) - } - - fn v6_iter_mut(&mut self) -> impl Iterator { - self.v6.iter_mut().chain(self.v6_default.iter_mut()) - } -} - impl TransportsSender { #[instrument(name = "poll_send", skip(self, cx, transmit), fields(len = transmit.contents.len()))] pub(crate) fn poll_send( diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index 745cab5a5d..e910012585 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -7,14 +7,13 @@ use std::{ }; use n0_watcher::Watchable; -#[cfg(not(wasm_browser))] use netdev::ipnet::{Ipv4Net, Ipv6Net}; use netwatch::{UdpSender, UdpSocket}; use pin_project::pin_project; use tracing::{debug, trace}; use super::{Addr, Transmit}; -use crate::metrics::MagicsockMetrics; +use crate::metrics::{EndpointMetrics, MagicsockMetrics}; #[derive(Debug)] pub(crate) struct IpTransport { @@ -330,3 +329,153 @@ impl IpSender { } } } + +#[derive(Debug, Clone)] +pub(super) struct IpTransportsSender { + /// Default sender for v4 + v4_default: Option, + v4: Vec, + /// Default sender for v6 + v6_default: Option, + v6: Vec, +} + +impl IpTransportsSender { + pub(super) fn v4_iter(&self) -> impl Iterator { + self.v4.iter().chain(self.v4_default.iter()) + } + + pub(super) fn v6_iter(&self) -> impl Iterator { + self.v6.iter().chain(self.v6_default.iter()) + } + pub(super) fn v4_iter_mut(&mut self) -> impl Iterator { + self.v4.iter_mut().chain(self.v4_default.iter_mut()) + } + + pub(super) fn v6_iter_mut(&mut self) -> impl Iterator { + self.v6.iter_mut().chain(self.v6_default.iter_mut()) + } +} + +#[derive(Debug)] +pub(super) struct IpTransports { + v4_default: Option, + v4: Vec, + v6_default: Option, + v6: Vec, +} + +impl IpTransports { + pub(super) fn create_sender(&self) -> IpTransportsSender { + let ip_v4_default = self.v4_default.as_ref().map(|t| t.create_sender()); + let ip_v4 = self.v4.iter().map(|t| t.create_sender()).collect(); + let ip_v6_default = self.v6_default.as_ref().map(|t| t.create_sender()); + let ip_v6 = self.v6.iter().map(|t| t.create_sender()).collect(); + + IpTransportsSender { + v4_default: ip_v4_default, + v4: ip_v4, + v6_default: ip_v6_default, + v6: ip_v6, + } + } + + pub(super) fn iter(&self) -> impl Iterator { + self.v4_default + .iter() + .chain(self.v4.iter()) + .chain(self.v6_default.iter()) + .chain(self.v6.iter()) + } + + pub(super) fn bind( + configs: impl Iterator, + metrics: &EndpointMetrics, + ) -> io::Result { + let mut ip_v4_default = None; + let mut ip_v4 = Vec::new(); + let mut ip_v6_default = None; + let mut ip_v6 = Vec::new(); + + for config in configs { + match IpTransport::bind(config, metrics.magicsock.clone()) { + Ok(transport) => { + if config.is_ipv4() { + if config.is_default() { + if ip_v4_default.is_some() { + return Err(io::Error::other( + "can only have a single IPv4 default transport", + )); + } + ip_v4_default = Some(transport); + } else { + ip_v4.push(transport); + } + } else if config.is_ipv6() { + if config.is_default() { + if ip_v6_default.is_some() { + return Err(io::Error::other( + "can only have a single IPv6 default transport", + )); + } + ip_v6_default = Some(transport); + } else { + ip_v6.push(transport); + } + } + } + Err(err) => { + if config.is_ipv6() { + tracing::info!("bind ignoring IPv6 bind failure: {:?}", err); + } else { + return Err(err); + } + } + } + } + + Ok(Self { + v4_default: ip_v4_default, + v4: ip_v4, + v6_default: ip_v6_default, + v6: ip_v6, + }) + } + + pub(super) fn poll_recv( + &mut self, + cx: &mut Context, + bufs: &mut [io::IoSliceMut<'_>], + metas: &mut [quinn_udp::RecvMeta], + source_addrs: &mut [Addr], + ) -> Poll> { + macro_rules! poll_transport { + ($socket:expr) => { + match $socket.poll_recv(cx, bufs, metas, source_addrs)? { + Poll::Pending | Poll::Ready(0) => {} + Poll::Ready(n) => { + return Poll::Ready(Ok(n)); + } + } + }; + } + + if let Some(ref mut transport) = self.v4_default { + poll_transport!(transport); + } + + for transport in &mut self.v4 { + poll_transport!(transport); + } + + if let Some(ref mut transport) = self.v6_default { + poll_transport!(transport); + } + + for transport in &mut self.v6 { + poll_transport!(transport); + } + + Poll::Pending + } +} From 262a3d2fb92ccc03087e80d04a2416ba4fcb582c Mon Sep 17 00:00:00 2001 From: Frando Date: Sat, 22 Nov 2025 13:24:43 +0100 Subject: [PATCH 5/8] fixup address mapping --- iroh/src/magicsock/transports.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index 030d5fbd55..df1e8bf1b5 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -722,7 +722,11 @@ impl quinn::UdpSender for MagicSender { } } } - MultipathMappedAddr::Ip(socket_addr) => Addr::Ip(socket_addr), + MultipathMappedAddr::Ip(socket_addr) => { + let socket_addr = + SocketAddr::new(socket_addr.ip().to_canonical(), socket_addr.port()); + Addr::Ip(socket_addr) + } }; let transmit = Transmit { @@ -738,7 +742,7 @@ impl quinn::UdpSender for MagicSender { { Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Err(ref err)) => { - warn!("dropped transmit: {err:#}"); + warn!(?transport_addr, "dropped transmit: {err:#}"); Poll::Ready(Ok(())) } Poll::Pending => { @@ -746,7 +750,7 @@ impl quinn::UdpSender for MagicSender { // different transport. Instead we let Quinn handle this as a lost // datagram. // TODO: Revisit this: we might want to do something better. - trace!("transport pending, dropped transmit"); + trace!(?transport_addr, "transport pending, dropped transmit"); Poll::Ready(Ok(())) } } From 0c1c3a19d53a726f62617ac08675ce29941e6aa7 Mon Sep 17 00:00:00 2001 From: Frando Date: Sat, 22 Nov 2025 13:29:39 +0100 Subject: [PATCH 6/8] example: expand transfer.rs example to support explicit interface binds --- iroh/examples/transfer.rs | 142 +++++++++++++++++++++++++++++--------- 1 file changed, 111 insertions(+), 31 deletions(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 3b6c54eb7b..1e261a645a 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -1,5 +1,5 @@ use std::{ - net::SocketAddr, + net::{SocketAddr, SocketAddrV4, SocketAddrV6}, str::FromStr, time::{Duration, Instant}, }; @@ -20,6 +20,7 @@ use iroh::{ }; use n0_error::{Result, StackResultExt, StdResultExt}; use n0_future::task::AbortOnDropHandle; +use netdev::ipnet::{Ipv4Net, Ipv6Net}; use tokio_stream::StreamExt; use tracing::{info, warn}; use url::Url; @@ -112,6 +113,9 @@ struct EndpointArgs { /// Disable relays completely. #[clap(long, conflicts_with = "relay_url")] no_relay: bool, + /// Disable discovery completely. + #[clap(long, conflicts_with_all = ["pkarr_relay_url", "no_pkarr_publish", "dns_origin_domain", "no_dns_resolve"])] + no_discovery: bool, /// If set no direct connections will be established. #[clap(long)] relay_only: bool, @@ -133,6 +137,27 @@ struct EndpointArgs { #[clap(long)] /// Enable mDNS discovery. mdns: bool, + /// Set the default IPv4 bind address. + #[clap(long)] + bind_addr_v4: Option, + /// Set additional IPv4 bind addresses. + /// + /// Syntax is "addr/mask:port", so e.g. "10.0.0.1/16:1234". + /// The mask is used to define for which destinations this bind address is used. + #[clap(long)] + bind_addr_v4_additional: Vec, + /// Set the default IPv6 bind address. + #[clap(long)] + bind_addr_v6: Option, + /// Set additional IPv6 bind addresses. + /// + /// Syntax is "addr/mask:port", so e.g. "2001:db8::1/16:1234". + /// The mask is used to define for which destinations this bind address is used. + #[clap(long)] + bind_addr_v6_additional: Vec, + /// Disable all default bind addresses. + #[clap(long)] + no_default_bind: bool, } #[derive(Subcommand, Debug)] @@ -191,7 +216,14 @@ async fn main() -> Result<()> { impl EndpointArgs { async fn bind_endpoint(self) -> Result { - let mut builder = Endpoint::builder(); + let relay_mode = if self.no_relay { + RelayMode::Disabled + } else if !self.relay_url.is_empty() { + RelayMode::Custom(RelayMap::from_iter(self.relay_url)) + } else { + self.env.relay_mode() + }; + let mut builder = Endpoint::empty_builder(relay_mode); let secret_key = match std::env::var("IROH_SECRET") { Ok(s) => SecretKey::from_str(&s) @@ -218,31 +250,20 @@ impl EndpointArgs { } } - let relay_mode = if self.no_relay { - RelayMode::Disabled - } else if !self.relay_url.is_empty() { - RelayMode::Custom(RelayMap::from_iter(self.relay_url)) - } else { - self.env.relay_mode() - }; - builder = builder.relay_mode(relay_mode); - - if !self.no_pkarr_publish { - let url = self - .pkarr_relay_url - .unwrap_or_else(|| self.env.pkarr_relay_url()); - builder = builder.discovery(PkarrPublisher::builder(url)); - } - - if !self.no_dns_resolve { - let domain = self - .dns_origin_domain - .unwrap_or_else(|| self.env.dns_origin_domain()); - builder = builder.discovery(DnsDiscovery::builder(domain)); - } + if !self.no_discovery { + if !self.no_pkarr_publish { + let url = self + .pkarr_relay_url + .unwrap_or_else(|| self.env.pkarr_relay_url()); + builder = builder.discovery(PkarrPublisher::builder(url)); + } - if self.relay_only { - builder = builder.clear_ip_transports(); + if !self.no_dns_resolve { + let domain = self + .dns_origin_domain + .unwrap_or_else(|| self.env.dns_origin_domain()); + builder = builder.discovery(DnsDiscovery::builder(domain)); + } } if let Some(host) = self.dns_server { @@ -257,6 +278,28 @@ impl EndpointArgs { builder = builder.dns_resolver(DnsResolver::with_nameserver(addr)); } + if self.relay_only || self.no_default_bind { + builder = builder.clear_ip_transports(); + } + + if let Some(addr) = self.bind_addr_v4 { + builder = builder.bind_addr_v4_default(*addr.ip(), addr.port()); + } + for addr in self.bind_addr_v4_additional { + let (net, port) = parse_ipv4_net(&addr) + .with_context(|_| format!("invalid bind-addr-v4-additional: {addr}"))?; + builder = builder.bind_addr_v4(net, port); + } + + if let Some(addr) = self.bind_addr_v6 { + builder = builder.bind_addr_v6_default(*addr.ip(), 0, addr.port()); + } + for addr in self.bind_addr_v6_additional { + let (net, port) = parse_ipv6_net(&addr) + .with_context(|_| format!("invalid bind-addr-v6-additional: {addr}"))?; + builder = builder.bind_addr_v6(net, 0, port); + } + let endpoint = builder.alpns(vec![TRANSFER_ALPN.to_vec()]).bind().await?; if self.mdns { @@ -355,11 +398,14 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> { // We sent the last message, so wait for the client to close the connection once // it received this message. - let res = tokio::time::timeout(Duration::from_secs(3), async move { - let closed = conn.closed().await; - let remote = endpoint_id.fmt_short(); - if !matches!(closed, ConnectionError::ApplicationClosed(_)) { - println!("[{remote}] Endpoint disconnected with an error: {closed:#}"); + let res = tokio::time::timeout(Duration::from_secs(3), { + let conn = conn.clone(); + async move { + let closed = conn.closed().await; + let remote = endpoint_id.fmt_short(); + if !matches!(closed, ConnectionError::ApplicationClosed(_)) { + println!("[{remote}] Endpoint disconnected with an error: {closed:#}"); + } } }) .await; @@ -376,6 +422,16 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> { } else { println!("[{remote}] Disconnected"); } + println!("[{remote}] Path stats:"); + for path in conn.paths().get() { + let stats = path.stats(); + println!( + " {:?}: RTT {:?}, {} packets sent", + path.remote_addr(), + stats.rtt, + stats.sent_packets + ); + } n0_error::Ok(()) }); } @@ -422,6 +478,16 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr) -> Result<()> { time_to_first_byte.as_secs_f64(), chnk ); + println!("Path stats:"); + for path in conn.paths().get() { + let stats = path.stats(); + println!( + " {:?}: RTT {:?}, {} packets sent", + path.remote_addr(), + stats.rtt, + stats.sent_packets + ); + } Ok(()) } @@ -545,3 +611,17 @@ fn watch_conn_type( }); AbortOnDropHandle::new(task) } + +fn parse_ipv4_net(s: &str) -> Result<(Ipv4Net, u16)> { + let (net, port) = s.split_once(":").std_context("missing colon")?; + let net: Ipv4Net = net.parse().std_context("invalid net")?; + let port: u16 = port.parse().std_context("invalid port")?; + Ok((net, port)) +} + +fn parse_ipv6_net(s: &str) -> Result<(Ipv6Net, u16)> { + let (net, port) = s.rsplit_once(":").std_context("missing colon")?; + let net: Ipv6Net = net.parse().std_context("invalid net")?; + let port: u16 = port.parse().std_context("invalid port")?; + Ok((net, port)) +} From b5ff035583f19f4010374ef0d9b3eb20cf5e5a90 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 22 Nov 2025 14:46:30 +0100 Subject: [PATCH 7/8] cleanup --- iroh/src/magicsock/transports.rs | 1 + iroh/src/magicsock/transports/ip.rs | 26 ++++++++++---------------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index df1e8bf1b5..168fb6fd4a 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -723,6 +723,7 @@ impl quinn::UdpSender for MagicSender { } } MultipathMappedAddr::Ip(socket_addr) => { + // Ensure IPv6 mapped addresses are converted back let socket_addr = SocketAddr::new(socket_addr.ip().to_canonical(), socket_addr.port()); Addr::Ip(socket_addr) diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index e910012585..2b57a796a6 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -28,32 +28,32 @@ pub(crate) struct IpTransport { pub enum Config { /// Default IPv4 binding V4Default { - /// TODO + /// The IP address to bind on ip_addr: Ipv4Addr, /// The port to bind on port: u16, }, /// Default IPv6 binding V6Default { - /// TODO + /// The IP address to bind on ip_addr: Ipv6Addr, /// The scope_id scope_id: u32, /// The port to bind on port: u16, }, - /// IPv4 binding + /// General IPv4 binding V4 { - /// TODO + /// The IP address to bind on ip_addr: Ipv4Net, /// The port to bind on port: u16, }, - /// IPv6 binding + /// General IPv6 binding V6 { - /// TODO + /// The IP address to bind on ip_addr: Ipv6Net, - /// The scope id + /// The scope id. scope_id: u32, /// The port to bind on port: u16, @@ -76,8 +76,8 @@ impl Config { matches!(self, Self::V4Default { .. } | Self::V6Default { .. }) } - /// TODO - pub fn is_valid_send_addr(&self, dst: SocketAddr) -> bool { + /// Does this configuration match to send to the given `dst` address. + pub(crate) fn is_valid_send_addr(&self, dst: SocketAddr) -> bool { match self { Self::V4Default { .. } => matches!(dst, SocketAddr::V4(_)), Self::V6Default { .. } => matches!(dst, SocketAddr::V6(_)), @@ -274,13 +274,7 @@ pub(super) struct IpSender { impl IpSender { pub(super) fn is_valid_send_addr(&self, dst: &SocketAddr) -> bool { - // Our net-tools crate binds sockets to their specific family. This means an IPv6 - // socket can not sent to IPv4, on any platform. So we need to convert an - // IPv4-mapped IPv6 address back to it's canonical IPv4 address. - let mut dst = *dst; - dst.set_ip(dst.ip().to_canonical()); - - self.config.is_valid_send_addr(dst) + self.config.is_valid_send_addr(*dst) } /// Creates a canonical socket address. From 232524f6da48aa6fc0f0b8fb654765d7732ff238 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 28 Nov 2025 13:00:17 +0100 Subject: [PATCH 8/8] cleanup --- iroh/src/magicsock/transports/ip.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/iroh/src/magicsock/transports/ip.rs b/iroh/src/magicsock/transports/ip.rs index 2b57a796a6..e4874a6c12 100644 --- a/iroh/src/magicsock/transports/ip.rs +++ b/iroh/src/magicsock/transports/ip.rs @@ -335,13 +335,6 @@ pub(super) struct IpTransportsSender { } impl IpTransportsSender { - pub(super) fn v4_iter(&self) -> impl Iterator { - self.v4.iter().chain(self.v4_default.iter()) - } - - pub(super) fn v6_iter(&self) -> impl Iterator { - self.v6.iter().chain(self.v6_default.iter()) - } pub(super) fn v4_iter_mut(&mut self) -> impl Iterator { self.v4.iter_mut().chain(self.v4_default.iter_mut()) }