Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

use std::collections::HashMap;
use std::sync::{LazyLock, Mutex};
use std::thread;
use std::thread::{self, JoinHandle};

use crate::ipc::OpaqueIpcReceiver;
use crate::ipc::{self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender};
use crossbeam_channel::{self, Receiver, Sender};
use serde::{Deserialize, Serialize};

use crate::ipc::{
self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver,
};

/// Global object wrapping a `RouterProxy`.
/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
Expand All @@ -44,7 +46,7 @@ impl RouterProxy {
// Router proxy takes both sending ends.
let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
thread::Builder::new()
let handle = thread::Builder::new()
.name("router-proxy".to_string())
.spawn(move || Router::new(msg_receiver, wakeup_receiver).run())
.expect("Failed to spawn router proxy thread");
Expand All @@ -53,6 +55,7 @@ impl RouterProxy {
msg_sender,
wakeup_sender,
shutdown: false,
handle: Some(handle),
}),
}
}
Expand Down Expand Up @@ -117,6 +120,11 @@ impl RouterProxy {
ack_receiver.recv().unwrap();
})
.unwrap();
comm.handle
.take()
.expect("Should have a join handle at shutdown")
.join()
.expect("Failed to join on the router proxy thread");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can avoid panicking here:

if let Err(err) = handle.join() {
    error!("Router thread panicked during shutdown: {:?}", err);
}

}

/// A convenience function to route an `IpcReceiver<T>` to an existing `Sender<T>`.
Expand Down Expand Up @@ -152,6 +160,7 @@ struct RouterProxyComm {
msg_sender: Sender<RouterMsg>,
wakeup_sender: IpcSender<()>,
shutdown: bool,
handle: Option<JoinHandle<()>>,
}

/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
Expand Down Expand Up @@ -211,7 +220,7 @@ impl Router {
sender
.send(())
.expect("Failed to send comfirmation of shutdown.");
break;
return;
},
}
},
Expand Down
45 changes: 25 additions & 20 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
use crate::ipc::IpcReceiver;
use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory};
use crate::router::{RouterProxy, ROUTER};
use crossbeam_channel::{self, Sender};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::cell::RefCell;
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
use std::env;
use std::iter;
#[cfg(not(any(
feature = "force-inprocess",
target_os = "android",
target_os = "ios",
target_os = "windows",
)))]
use std::io::Error;
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios",)))]
use std::process::{self, Command, Stdio};
#[cfg(not(any(
Expand All @@ -27,7 +27,11 @@ use std::process::{self, Command, Stdio};
)))]
use std::ptr;
use std::rc::Rc;
use std::thread;
use std::time::{Duration, Instant};
use std::{iter, thread};

use crossbeam_channel::{self, Sender};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

#[cfg(not(any(
feature = "force-inprocess",
Expand All @@ -36,15 +40,10 @@ use std::thread;
target_os = "windows"
)))]
use crate::ipc::IpcOneShotServer;

#[cfg(not(any(
feature = "force-inprocess",
target_os = "android",
target_os = "ios",
target_os = "windows",
)))]
use std::io::Error;
use std::time::{Duration, Instant};
#[cfg(not(any(feature = "force-inprocess", target_os = "android", target_os = "ios")))]
use crate::ipc::IpcReceiver;
use crate::ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory};
use crate::router::{RouterProxy, ROUTER};

#[cfg(not(any(
feature = "force-inprocess",
Expand Down Expand Up @@ -340,6 +339,12 @@ fn router_flood() {
}
}

#[test]
fn router_shutdown() {
let router = RouterProxy::new();
router.shutdown();
}

#[test]
fn router_routing_to_new_crossbeam_receiver() {
let person = ("Patrick Walton".to_owned(), 29);
Expand Down Expand Up @@ -722,10 +727,10 @@ fn transfer_closed_sender() {
#[cfg(feature = "async")]
#[test]
fn test_receiver_stream() {
use futures_core::task::Context;
use futures_core::task::Poll;
use futures_core::Stream;
use std::pin::Pin;

use futures_core::task::{Context, Poll};
use futures_core::Stream;
let (tx, rx) = ipc::channel().unwrap();
let (waker, count) = futures_test::task::new_count_waker();
let mut ctx = Context::from_waker(&waker);
Expand Down
Loading