Skip to content

Commit a269da5

Browse files
committed
Add one-shot handlers to Router API.
Signed-off-by: Josh Matthews <[email protected]>
1 parent c680ac0 commit a269da5

File tree

2 files changed

+79
-7
lines changed

2 files changed

+79
-7
lines changed

src/router.rs

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,32 @@ impl RouterProxy {
7777

7878
/// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
7979
/// to the router.
80-
pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
81-
where
80+
pub fn add_typed_route<T>(
81+
&self,
82+
receiver: IpcReceiver<T>,
83+
mut callback: TypedRouterMultiHandler<T>,
84+
) where
85+
T: Serialize + for<'de> Deserialize<'de> + 'static,
86+
{
87+
// Before passing the message on to the callback, turn it into the appropriate type
88+
let modified_callback = move |msg: IpcMessage| {
89+
let typed_message = msg.to::<T>();
90+
callback(typed_message)
91+
};
92+
93+
self.add_route(
94+
receiver.to_opaque(),
95+
RouterHandler::Multi(Box::new(modified_callback)),
96+
);
97+
}
98+
99+
/// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
100+
/// to the router.
101+
pub fn add_typed_one_shot_route<T>(
102+
&self,
103+
receiver: IpcReceiver<T>,
104+
callback: TypedRouterOneShotHandler<T>,
105+
) where
82106
T: Serialize + for<'de> Deserialize<'de> + 'static,
83107
{
84108
// Before passing the message on to the callback, turn it into the appropriate type
@@ -87,7 +111,10 @@ impl RouterProxy {
87111
callback(typed_message)
88112
};
89113

90-
self.add_route(receiver.to_opaque(), Box::new(modified_callback));
114+
self.add_route(
115+
receiver.to_opaque(),
116+
RouterHandler::Once(Box::new(modified_callback)),
117+
);
91118
}
92119

93120
/// Send a shutdown message to the router containing a ACK sender,
@@ -218,7 +245,18 @@ impl Router {
218245
},
219246
// Event from one of our registered receivers, call callback.
220247
IpcSelectionResult::MessageReceived(id, message) => {
221-
self.handlers.get_mut(&id).unwrap()(message)
248+
match self.handlers.get_mut(&id).unwrap() {
249+
RouterHandler::Once(_) => {},
250+
RouterHandler::Multi(handler) => {
251+
(handler)(message);
252+
continue;
253+
},
254+
};
255+
let RouterHandler::Once(handler) = self.handlers.remove(&id).unwrap()
256+
else {
257+
continue;
258+
};
259+
(handler)(message);
222260
},
223261
IpcSelectionResult::ChannelClosed(id) => {
224262
let _ = self.handlers.remove(&id).unwrap();
@@ -238,7 +276,18 @@ enum RouterMsg {
238276
}
239277

240278
/// Function to call when a new event is received from the corresponding receiver.
241-
pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
279+
pub type RouterMultiHandler = Box<dyn FnMut(IpcMessage) + Send>;
280+
281+
/// Function to call once when a new event is received from the corresponding receiver.
282+
pub type RouterOneShotHandler = Box<dyn FnOnce(IpcMessage) + Send>;
283+
284+
enum RouterHandler {
285+
Once(RouterOneShotHandler),
286+
Multi(RouterMultiHandler),
287+
}
288+
289+
/// Like [RouterMultiHandler] but includes the type that will be passed to the callback
290+
pub type TypedRouterMultiHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;
242291

243-
/// Like [RouterHandler] but includes the type that will be passed to the callback
244-
pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;
292+
/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback
293+
pub type TypedRouterOneShotHandler<T> = Box<dyn FnOnce(Result<T, bincode::Error>) + Send>;

src/test.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,29 @@ fn router_routing_to_new_crossbeam_receiver() {
356356
assert_eq!(received_person, person);
357357
}
358358

359+
#[test]
360+
fn router_once_handler() {
361+
let person = ("Patrick Walton".to_owned(), 29);
362+
let (tx, rx) = ipc::channel().unwrap();
363+
let (tx2, rx2) = ipc::channel().unwrap();
364+
365+
let router = RouterProxy::new();
366+
let mut once_tx2 = Some(tx2);
367+
router.add_typed_one_shot_route(
368+
rx,
369+
Box::new(move |_msg| once_tx2.take().unwrap().send(()).unwrap()),
370+
);
371+
372+
// Send one single event.
373+
tx.send(person.clone()).unwrap();
374+
// Wait for acknowledgement that the callback ran.
375+
rx2.recv().unwrap();
376+
// This send should succeed but no handler should run. If it does run,
377+
// a panic will occur.
378+
tx.send(person.clone()).unwrap();
379+
assert!(rx2.recv().is_err());
380+
}
381+
359382
#[test]
360383
fn router_multiplexing() {
361384
let person = ("Patrick Walton".to_owned(), 29);

0 commit comments

Comments
 (0)