Skip to content

Commit 30c23e8

Browse files
authored
fix: stop relay actor on endpoint close (#3601)
## Description Since the transport abstraction was introduced, we no longer shut down the relay actor when calling `Endpoint::close`. The relay actor is currently only stopped when the last clone of the `iroh::Endpoint` is dropped. This is surprising and wasteful. This PR fixes this so that the relay actor is stopped when calling `Endpoint::close`. Background: Since introducing the transport abstraction, the relay actor handle is now owned by `Transports` which is owned by `MagicUdpSocket` which is owned by the `quinn::Endpoint` as its `AsyncUdpSocket`. The `AsyncUdpSocket` trait has no notification when the endpoint is closing, thus currently the relay transport (and the relay actor, which it owns) is not notified at all when the endpoint is shutting down. Therefore, it keeps running (and potentially reconnecting to the relay or receiving messages) until it is stopped when the last clone of the endpoint is dropped and thus the `AbortOnDropHandle` owning the relay actor tasks is dropped, finally stopping the relay actor tasks. This PR exposes an existing cancellation token within the relay actor to the magicsocket, allowing it to cancel the relay actors when calling `Endpoint::close`. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions I tested this by looking at logs after closing an endpoint. Not sure if there is a good way to test this. I'd vote to merge this even without a test because it fixes an issue, unless someone has a good idea on how to test this. ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [x] All breaking changes documented. - [ ] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme)
1 parent 7b9c95b commit 30c23e8

File tree

3 files changed

+25
-23
lines changed

3 files changed

+25
-23
lines changed

iroh/src/magicsock.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ pub(crate) struct Handle {
149149
msock: Arc<MagicSock>,
150150
// empty when shutdown
151151
actor_task: Arc<Mutex<Option<AbortOnDropHandle<()>>>>,
152-
/// Token to cancel the actor task.
153-
actor_token: CancellationToken,
152+
/// Token to cancel the actor task and shutdown the relay transport.
153+
shutdown_token: CancellationToken,
154154
// quinn endpoint
155155
endpoint: quinn::Endpoint,
156156
}
@@ -1392,17 +1392,22 @@ impl Handle {
13921392
let my_relay = Watchable::new(None);
13931393
let ipv6_reported = Arc::new(AtomicBool::new(ipv6_reported));
13941394

1395-
let relay_transport = RelayTransport::new(RelayActorConfig {
1396-
my_relay: my_relay.clone(),
1397-
secret_key: secret_key.clone(),
1398-
#[cfg(not(wasm_browser))]
1399-
dns_resolver: dns_resolver.clone(),
1400-
proxy_url: proxy_url.clone(),
1401-
ipv6_reported: ipv6_reported.clone(),
1402-
#[cfg(any(test, feature = "test-utils"))]
1403-
insecure_skip_relay_cert_verify,
1404-
metrics: metrics.magicsock.clone(),
1405-
});
1395+
let shutdown_token = CancellationToken::new();
1396+
1397+
let relay_transport = RelayTransport::new(
1398+
RelayActorConfig {
1399+
my_relay: my_relay.clone(),
1400+
secret_key: secret_key.clone(),
1401+
#[cfg(not(wasm_browser))]
1402+
dns_resolver: dns_resolver.clone(),
1403+
proxy_url: proxy_url.clone(),
1404+
ipv6_reported: ipv6_reported.clone(),
1405+
#[cfg(any(test, feature = "test-utils"))]
1406+
insecure_skip_relay_cert_verify,
1407+
metrics: metrics.magicsock.clone(),
1408+
},
1409+
shutdown_token.child_token(),
1410+
);
14061411
let relay_transports = vec![relay_transport];
14071412

14081413
let secret_encryption_key = secret_ed_box(&secret_key);
@@ -1531,12 +1536,9 @@ impl Handle {
15311536
#[cfg(not(wasm_browser))]
15321537
actor.update_direct_addresses(None);
15331538

1534-
let actor_token = CancellationToken::new();
1535-
let token = actor_token.clone();
1536-
15371539
let actor_task = task::spawn(
15381540
actor
1539-
.run(token, local_addrs_watch, sender)
1541+
.run(shutdown_token.child_token(), local_addrs_watch, sender)
15401542
.instrument(info_span!("actor")),
15411543
);
15421544

@@ -1546,7 +1548,7 @@ impl Handle {
15461548
msock,
15471549
actor_task,
15481550
endpoint,
1549-
actor_token,
1551+
shutdown_token,
15501552
})
15511553
}
15521554

@@ -1587,7 +1589,7 @@ impl Handle {
15871589
return;
15881590
}
15891591
self.msock.closing.store(true, Ordering::Relaxed);
1590-
self.actor_token.cancel();
1592+
self.shutdown_token.cancel();
15911593

15921594
// MutexGuard is not held across await points
15931595
let task = self.actor_task.lock().expect("poisoned").take();

iroh/src/magicsock/transports/relay.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use n0_future::{
1313
};
1414
use n0_watcher::{Watchable, Watcher as _};
1515
use tokio::sync::mpsc;
16-
use tokio_util::sync::PollSender;
16+
use tokio_util::sync::{CancellationToken, PollSender};
1717
use tracing::{Instrument, error, info_span, trace, warn};
1818

1919
use super::{Addr, Transmit};
@@ -38,7 +38,7 @@ pub(crate) struct RelayTransport {
3838
}
3939

4040
impl RelayTransport {
41-
pub(crate) fn new(config: RelayActorConfig) -> Self {
41+
pub(crate) fn new(config: RelayActorConfig, cancel_token: CancellationToken) -> Self {
4242
let (relay_datagram_send_tx, relay_datagram_send_rx) = mpsc::channel(256);
4343

4444
let (relay_datagram_recv_tx, relay_datagram_recv_rx) = mpsc::channel(512);
@@ -48,7 +48,7 @@ impl RelayTransport {
4848
let my_endpoint_id = config.secret_key.public();
4949
let my_relay = config.my_relay.clone();
5050

51-
let relay_actor = RelayActor::new(config, relay_datagram_recv_tx);
51+
let relay_actor = RelayActor::new(config, relay_datagram_recv_tx, cancel_token);
5252

5353
let actor_handle = AbortOnDropHandle::new(task::spawn(
5454
async move {

iroh/src/magicsock/transports/relay/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,8 +855,8 @@ impl RelayActor {
855855
pub(super) fn new(
856856
config: Config,
857857
relay_datagram_recv_queue: mpsc::Sender<RelayRecvDatagram>,
858+
cancel_token: CancellationToken,
858859
) -> Self {
859-
let cancel_token = CancellationToken::new();
860860
Self {
861861
config,
862862
relay_datagram_recv_queue,

0 commit comments

Comments
 (0)