Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 111 additions & 31 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
net::SocketAddr,
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
str::FromStr,
time::{Duration, Instant},
};
Expand All @@ -20,6 +20,7 @@ use iroh::{
};
use n0_error::{Result, StackResultExt, StdResultExt};
use n0_future::task::AbortOnDropHandle;
use netdev::ipnet::{Ipv4Net, Ipv6Net};
use tokio_stream::StreamExt;
use tracing::{info, warn};
use url::Url;
Expand Down Expand Up @@ -112,6 +113,9 @@ struct EndpointArgs {
/// Disable relays completely.
#[clap(long, conflicts_with = "relay_url")]
no_relay: bool,
/// Disable discovery completely.
#[clap(long, conflicts_with_all = ["pkarr_relay_url", "no_pkarr_publish", "dns_origin_domain", "no_dns_resolve"])]
no_discovery: bool,
/// If set no direct connections will be established.
#[clap(long)]
relay_only: bool,
Expand All @@ -133,6 +137,27 @@ struct EndpointArgs {
#[clap(long)]
/// Enable mDNS discovery.
mdns: bool,
/// Set the default IPv4 bind address.
#[clap(long)]
bind_addr_v4: Option<SocketAddrV4>,
/// Set additional IPv4 bind addresses.
///
/// Syntax is "addr/mask:port", so e.g. "10.0.0.1/16:1234".
/// The mask is used to define for which destinations this bind address is used.
#[clap(long)]
bind_addr_v4_additional: Vec<String>,
/// Set the default IPv6 bind address.
#[clap(long)]
bind_addr_v6: Option<SocketAddrV6>,
/// Set additional IPv6 bind addresses.
///
/// Syntax is "addr/mask:port", so e.g. "2001:db8::1/16:1234".
/// The mask is used to define for which destinations this bind address is used.
#[clap(long)]
bind_addr_v6_additional: Vec<String>,
/// Disable all default bind addresses.
#[clap(long)]
no_default_bind: bool,
}

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -191,7 +216,14 @@ async fn main() -> Result<()> {

impl EndpointArgs {
async fn bind_endpoint(self) -> Result<Endpoint> {
let mut builder = Endpoint::builder();
let relay_mode = if self.no_relay {
RelayMode::Disabled
} else if !self.relay_url.is_empty() {
RelayMode::Custom(RelayMap::from_iter(self.relay_url))
} else {
self.env.relay_mode()
};
let mut builder = Endpoint::empty_builder(relay_mode);

let secret_key = match std::env::var("IROH_SECRET") {
Ok(s) => SecretKey::from_str(&s)
Expand All @@ -218,31 +250,20 @@ impl EndpointArgs {
}
}

let relay_mode = if self.no_relay {
RelayMode::Disabled
} else if !self.relay_url.is_empty() {
RelayMode::Custom(RelayMap::from_iter(self.relay_url))
} else {
self.env.relay_mode()
};
builder = builder.relay_mode(relay_mode);

if !self.no_pkarr_publish {
let url = self
.pkarr_relay_url
.unwrap_or_else(|| self.env.pkarr_relay_url());
builder = builder.discovery(PkarrPublisher::builder(url));
}

if !self.no_dns_resolve {
let domain = self
.dns_origin_domain
.unwrap_or_else(|| self.env.dns_origin_domain());
builder = builder.discovery(DnsDiscovery::builder(domain));
}
if !self.no_discovery {
if !self.no_pkarr_publish {
let url = self
.pkarr_relay_url
.unwrap_or_else(|| self.env.pkarr_relay_url());
builder = builder.discovery(PkarrPublisher::builder(url));
}

if self.relay_only {
builder = builder.clear_ip_transports();
if !self.no_dns_resolve {
let domain = self
.dns_origin_domain
.unwrap_or_else(|| self.env.dns_origin_domain());
builder = builder.discovery(DnsDiscovery::builder(domain));
}
}

if let Some(host) = self.dns_server {
Expand All @@ -257,6 +278,28 @@ impl EndpointArgs {
builder = builder.dns_resolver(DnsResolver::with_nameserver(addr));
}

if self.relay_only || self.no_default_bind {
builder = builder.clear_ip_transports();
}

if let Some(addr) = self.bind_addr_v4 {
builder = builder.bind_addr_v4_default(*addr.ip(), addr.port());
}
for addr in self.bind_addr_v4_additional {
let (net, port) = parse_ipv4_net(&addr)
.with_context(|_| format!("invalid bind-addr-v4-additional: {addr}"))?;
builder = builder.bind_addr_v4(net, port);
}

if let Some(addr) = self.bind_addr_v6 {
builder = builder.bind_addr_v6_default(*addr.ip(), 0, addr.port());
}
for addr in self.bind_addr_v6_additional {
let (net, port) = parse_ipv6_net(&addr)
.with_context(|_| format!("invalid bind-addr-v6-additional: {addr}"))?;
builder = builder.bind_addr_v6(net, 0, port);
}

let endpoint = builder.alpns(vec![TRANSFER_ALPN.to_vec()]).bind().await?;

if self.mdns {
Expand Down Expand Up @@ -355,11 +398,14 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {

// We sent the last message, so wait for the client to close the connection once
// it received this message.
let res = tokio::time::timeout(Duration::from_secs(3), async move {
let closed = conn.closed().await;
let remote = endpoint_id.fmt_short();
if !matches!(closed, ConnectionError::ApplicationClosed(_)) {
println!("[{remote}] Endpoint disconnected with an error: {closed:#}");
let res = tokio::time::timeout(Duration::from_secs(3), {
let conn = conn.clone();
async move {
let closed = conn.closed().await;
let remote = endpoint_id.fmt_short();
if !matches!(closed, ConnectionError::ApplicationClosed(_)) {
println!("[{remote}] Endpoint disconnected with an error: {closed:#}");
}
}
})
.await;
Expand All @@ -376,6 +422,16 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {
} else {
println!("[{remote}] Disconnected");
}
println!("[{remote}] Path stats:");
for path in conn.paths().get() {
let stats = path.stats();
println!(
" {:?}: RTT {:?}, {} packets sent",
path.remote_addr(),
stats.rtt,
stats.sent_packets
);
}
n0_error::Ok(())
});
}
Expand Down Expand Up @@ -422,6 +478,16 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr) -> Result<()> {
time_to_first_byte.as_secs_f64(),
chnk
);
println!("Path stats:");
for path in conn.paths().get() {
let stats = path.stats();
println!(
" {:?}: RTT {:?}, {} packets sent",
path.remote_addr(),
stats.rtt,
stats.sent_packets
);
}
Ok(())
}

Expand Down Expand Up @@ -545,3 +611,17 @@ fn watch_conn_type(
});
AbortOnDropHandle::new(task)
}

fn parse_ipv4_net(s: &str) -> Result<(Ipv4Net, u16)> {
let (net, port) = s.split_once(":").std_context("missing colon")?;
let net: Ipv4Net = net.parse().std_context("invalid net")?;
let port: u16 = port.parse().std_context("invalid port")?;
Ok((net, port))
}

fn parse_ipv6_net(s: &str) -> Result<(Ipv6Net, u16)> {
let (net, port) = s.rsplit_once(":").std_context("missing colon")?;
let net: Ipv6Net = net.parse().std_context("invalid net")?;
let port: u16 = port.parse().std_context("invalid port")?;
Ok((net, port))
}
58 changes: 46 additions & 12 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
//!
//! [module docs]: crate

use std::{
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
sync::Arc,
};
#[cfg(not(wasm_browser))]
use std::net::{Ipv4Addr, Ipv6Addr};
use std::{net::SocketAddr, sync::Arc};

use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
use iroh_relay::{RelayConfig, RelayMap};
use n0_error::{e, ensure, stack_error};
use n0_future::time::Duration;
use n0_watcher::Watcher;
#[cfg(not(wasm_browser))]
pub use netdev::ipnet::{Ipv4Net, Ipv6Net};
use tracing::{debug, instrument, trace, warn};
use url::Url;

Expand Down Expand Up @@ -73,6 +74,8 @@ pub use self::connection::{
IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection, RemoteEndpointIdError,
ZeroRttStatus,
};
#[cfg(not(wasm_browser))]
pub use crate::magicsock::transports::IpConfig;
pub use crate::magicsock::transports::TransportConfig;

/// Builder for [`Endpoint`].
Expand Down Expand Up @@ -236,11 +239,24 @@ impl Builder {
///
/// Setting the port to `0` will use a random port.
/// If the port specified is already in use, it will fallback to choosing a random port.
///
/// Only a single interface can be the default, so this will replace the existing default
#[cfg(not(wasm_browser))]
pub fn bind_addr_v4(mut self, bind_addr: SocketAddrV4) -> Self {
self.transports.push(TransportConfig::Ip {
bind_addr: bind_addr.into(),
});
pub fn bind_addr_v4_default(mut self, ip_addr: Ipv4Addr, port: u16) -> Self {
self.transports
.retain(|t| !matches!(t, TransportConfig::Ip(IpConfig::V4Default { .. })));
self.transports
.push(TransportConfig::Ip(IpConfig::V4Default { ip_addr, port }));
self
}

/// Binds an ipv4 socket
///
/// If you want to remove the default transports, make sure to call `clear_ip` first.
#[cfg(not(wasm_browser))]
pub fn bind_addr_v4(mut self, ip_addr: Ipv4Net, port: u16) -> Self {
self.transports
.push(TransportConfig::Ip(IpConfig::V4 { ip_addr, port }));
self
}

Expand All @@ -250,11 +266,29 @@ impl Builder {
///
/// Setting the port to `0` will use a random port.
/// If the port specified is already in use, it will fallback to choosing a random port.
///
/// Only a single interface can be the default, so this will replace the existing default
#[cfg(not(wasm_browser))]
pub fn bind_addr_v6(mut self, bind_addr: SocketAddrV6) -> Self {
self.transports.push(TransportConfig::Ip {
bind_addr: bind_addr.into(),
});
pub fn bind_addr_v6_default(mut self, ip_addr: Ipv6Addr, scope_id: u32, port: u16) -> Self {
self.transports
.push(TransportConfig::Ip(IpConfig::V6Default {
ip_addr,
scope_id,
port,
}));
self
}

/// Binds an ipv6 socket
///
/// If you want to remove the default transports, make sure to call `clear_ip` first.
#[cfg(not(wasm_browser))]
pub fn bind_addr_v6(mut self, ip_addr: Ipv6Net, scope_id: u32, port: u16) -> Self {
self.transports.push(TransportConfig::Ip(IpConfig::V6 {
ip_addr,
scope_id,
port,
}));
self
}

Expand Down
Loading
Loading