diff --git a/src/router.rs b/src/router.rs index 45e7f9d9..449dae5f 100644 --- a/src/router.rs +++ b/src/router.rs @@ -15,13 +15,15 @@ use std::collections::HashMap; use std::sync::{LazyLock, Mutex}; -use std::thread; +use std::thread::{self, JoinHandle}; -use crate::ipc::OpaqueIpcReceiver; -use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender}; use crossbeam_channel::{self, Receiver, Sender}; use serde::{Deserialize, Serialize}; +use crate::ipc::{ + self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver, +}; + /// Global object wrapping a `RouterProxy`. /// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver /// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver)) @@ -44,7 +46,7 @@ impl RouterProxy { // Router proxy takes both sending ends. let (msg_sender, msg_receiver) = crossbeam_channel::unbounded(); let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap(); - thread::Builder::new() + let handle = thread::Builder::new() .name("router-proxy".to_string()) .spawn(move || Router::new(msg_receiver, wakeup_receiver).run()) .expect("Failed to spawn router proxy thread"); @@ -53,6 +55,7 @@ impl RouterProxy { msg_sender, wakeup_sender, shutdown: false, + handle: Some(handle), }), } } @@ -117,6 +120,11 @@ impl RouterProxy { ack_receiver.recv().unwrap(); }) .unwrap(); + comm.handle + .take() + .expect("Should have a join handle at shutdown") + .join() + .expect("Failed to join on the router proxy thread"); } /// A convenience function to route an `IpcReceiver` to an existing `Sender`. @@ -152,6 +160,7 @@ struct RouterProxyComm { msg_sender: Sender, wakeup_sender: IpcSender<()>, shutdown: bool, + handle: Option>, } /// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet @@ -211,7 +220,7 @@ impl Router { sender .send(()) .expect("Failed to send comfirmation of shutdown."); - break; + return; }, } }, diff --git a/src/test.rs b/src/test.rs index 4f4f3e0a..aed0315b 100644 --- a/src/test.rs +++ b/src/test.rs @@ -7,16 +7,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] -use crate::ipc::IpcReceiver; -use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory}; -use crate::router::{RouterProxy, ROUTER}; -use crossbeam_channel::{self, Sender}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::cell::RefCell; #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] use std::env; -use std::iter; +#[cfg(not(any( + feature = "force-inprocess", + target_os = "android", + target_os = "ios", + target_os = "windows", +)))] +use std::io::Error; #[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios",)))] use std::process::{self, Command, Stdio}; #[cfg(not(any( @@ -27,7 +27,11 @@ use std::process::{self, Command, Stdio}; )))] use std::ptr; use std::rc::Rc; -use std::thread; +use std::time::{Duration, Instant}; +use std::{iter, thread}; + +use crossbeam_channel::{self, Sender}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; #[cfg(not(any( feature = "force-inprocess", @@ -36,15 +40,10 @@ use std::thread; target_os = "windows" )))] use crate::ipc::IpcOneShotServer; - -#[cfg(not(any( - feature = "force-inprocess", - target_os = "android", - target_os = "ios", - target_os = "windows", -)))] -use std::io::Error; -use std::time::{Duration, Instant}; +#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))] +use crate::ipc::IpcReceiver; +use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory}; +use crate::router::{RouterProxy, ROUTER}; #[cfg(not(any( feature = "force-inprocess", @@ -340,6 +339,12 @@ fn router_flood() { } } +#[test] +fn router_shutdown() { + let router = RouterProxy::new(); + router.shutdown(); +} + #[test] fn router_routing_to_new_crossbeam_receiver() { let person = ("Patrick Walton".to_owned(), 29); @@ -722,10 +727,10 @@ fn transfer_closed_sender() { #[cfg(feature = "async")] #[test] fn test_receiver_stream() { - use futures_core::task::Context; - use futures_core::task::Poll; - use futures_core::Stream; use std::pin::Pin; + + use futures_core::task::{Context, Poll}; + use futures_core::Stream; let (tx, rx) = ipc::channel().unwrap(); let (waker, count) = futures_test::task::new_count_waker(); let mut ctx = Context::from_waker(&waker);