Skip to content

Commit ef6c91d

Browse files
committed
Merge #1486: Overhaul stats: Improve events package (part 3)
d98d93f refactor: [#1485] simplify event mods in pakages using the events pkg (Jose Celano) 97eb20a test: [#1485] add unit tests to events pkg (Jose Celano) 5a087d0 refactor: [#1485] replace Arc<Box<dyn Sender>> with Arc<dyn Sender> to avoid double allocations (Jose Celano) 8f5b57e refactor: [#1485] replace Arc<Option> with Option<Arc> in events pkg (Jose Celano) c2df95f refactor: [#1485] rename trait fn send_event to send (Jose Celano) f546bc1 refactor: [#1485] normalize event type parameter name in events pkg (Jose Celano) 3057f48 refactor: [#1485] decouple events traits from tokio broadcast channel implementation (Jose Celano) e3703c1 feat: [#1485] add Receiver trait to events package (Jose Celano) d4343c0 fix: [#1485] remove duplicate const (Jose Celano) Pull request description: Continuation of #1483 ### Subtasks - [x] `Receiver` trait. - [x] Decouple `Sender` and `Receiver` traits from the tokio broadcast implementation. Wrapper for `SendError` and `RecvError`. - [x] Return an `Option<Arc<...>>` instead of an `Arc<Option<...>>` in the `EventBus` implementation. - [x] Replace `Arc<Box<dyn Sender>>` with `Arc<dyn Sender>` to avoid double allocations. - [x] Add unit tests. ACKs for top commit: josecelano: ACK d98d93f Tree-SHA512: 6798238ff6dc2f6e06ecd077c2f0ddd56cd1e52bbb803273535bde6eec5c1334603d26c78daedb82eac886c5146fc575430520c0d29e7565e0fcb32ecdfe112c
2 parents b6ed3f3 + d98d93f commit ef6c91d

File tree

33 files changed

+383
-158
lines changed

33 files changed

+383
-158
lines changed

packages/events/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ version.workspace = true
1616

1717
[dependencies]
1818
futures = "0"
19-
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync"] }
19+
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread", "signal", "sync", "time"] }
2020

2121
[dev-dependencies]
2222
mockall = "0"

packages/events/src/broadcaster.rs

Lines changed: 99 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,117 @@
11
use futures::future::BoxFuture;
22
use futures::FutureExt;
3-
use tokio::sync::broadcast::error::SendError;
43
use tokio::sync::broadcast::{self};
54

6-
use crate::sender::Sender;
5+
use crate::receiver::{Receiver, RecvError};
6+
use crate::sender::{SendError, Sender};
77

88
const CHANNEL_CAPACITY: usize = 32768;
99

10-
/// An event sender implementation using a broadcast channel.
11-
#[derive(Clone)]
12-
pub struct Broadcaster<E: Sync + Send + Clone> {
13-
pub(crate) sender: broadcast::Sender<E>,
10+
/// An event sender and receiver implementation using a broadcast channel.
11+
#[derive(Clone, Debug)]
12+
pub struct Broadcaster<Event: Sync + Send + Clone> {
13+
pub(crate) sender: broadcast::Sender<Event>,
1414
}
1515

16-
impl<E: Sync + Send + Clone> Sender for Broadcaster<E> {
17-
type Event = E;
18-
19-
fn send_event(&self, event: E) -> BoxFuture<'_, Option<Result<usize, SendError<E>>>> {
20-
async move { Some(self.sender.send(event)) }.boxed()
21-
}
22-
}
23-
24-
impl<E: Sync + Send + Clone> Default for Broadcaster<E> {
16+
impl<Event: Sync + Send + Clone> Default for Broadcaster<Event> {
2517
fn default() -> Self {
26-
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);
18+
let (sender, _receiver) = broadcast::channel(CHANNEL_CAPACITY);
2719
Self { sender }
2820
}
2921
}
3022

31-
impl<E: Sync + Send + Clone> Broadcaster<E> {
23+
impl<Event: Sync + Send + Clone> Broadcaster<Event> {
3224
#[must_use]
33-
pub fn subscribe(&self) -> broadcast::Receiver<E> {
25+
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
3426
self.sender.subscribe()
3527
}
3628
}
29+
30+
impl<Event: Sync + Send + Clone> Sender for Broadcaster<Event> {
31+
type Event = Event;
32+
33+
fn send(&self, event: Event) -> BoxFuture<'_, Option<Result<usize, SendError<Event>>>> {
34+
async move { Some(self.sender.send(event).map_err(std::convert::Into::into)) }.boxed()
35+
}
36+
}
37+
38+
impl<Event: Sync + Send + Clone> Receiver for broadcast::Receiver<Event> {
39+
type Event = Event;
40+
41+
fn recv(&mut self) -> BoxFuture<'_, Result<Self::Event, RecvError>> {
42+
async move { self.recv().await.map_err(std::convert::Into::into) }.boxed()
43+
}
44+
}
45+
46+
impl<Event> From<broadcast::error::SendError<Event>> for SendError<Event> {
47+
fn from(err: broadcast::error::SendError<Event>) -> Self {
48+
SendError(err.0)
49+
}
50+
}
51+
52+
impl From<broadcast::error::RecvError> for RecvError {
53+
fn from(err: broadcast::error::RecvError) -> Self {
54+
match err {
55+
broadcast::error::RecvError::Lagged(amt) => RecvError::Lagged(amt),
56+
broadcast::error::RecvError::Closed => RecvError::Closed,
57+
}
58+
}
59+
}
60+
61+
#[cfg(test)]
62+
mod tests {
63+
use tokio::time::{timeout, Duration};
64+
65+
use super::*;
66+
67+
#[tokio::test]
68+
async fn it_should_allow_sending_an_event_and_received_it() {
69+
let broadcaster = Broadcaster::<String>::default();
70+
71+
let mut receiver = broadcaster.subscribe();
72+
73+
let event = "test";
74+
75+
let _unused = broadcaster.send(event.to_owned()).await.unwrap().unwrap();
76+
77+
let received_event = receiver.recv().await.unwrap();
78+
79+
assert_eq!(received_event, event);
80+
}
81+
82+
#[tokio::test]
83+
async fn it_should_return_the_number_of_receivers_when_and_event_is_sent() {
84+
let broadcaster = Broadcaster::<String>::default();
85+
let mut _receiver = broadcaster.subscribe();
86+
87+
let number_of_receivers = broadcaster.send("test".into()).await;
88+
89+
assert!(matches!(number_of_receivers, Some(Ok(1))));
90+
}
91+
92+
#[tokio::test]
93+
async fn it_should_fail_when_trying_tos_send_with_no_subscribers() {
94+
let event = String::from("test");
95+
96+
let broadcaster = Broadcaster::<String>::default();
97+
98+
let result: Result<usize, SendError<String>> = broadcaster.send(event).await.unwrap();
99+
100+
assert!(matches!(result, Err(SendError::<String>(_event))));
101+
}
102+
103+
#[tokio::test]
104+
async fn it_should_allow_subscribing_multiple_receivers() {
105+
let broadcaster = Broadcaster::<u8>::default();
106+
let mut r1 = broadcaster.subscribe();
107+
let mut r2 = broadcaster.subscribe();
108+
109+
let _ = broadcaster.send(1).await;
110+
111+
let val1 = timeout(Duration::from_secs(1), r1.recv()).await.unwrap().unwrap();
112+
let val2 = timeout(Duration::from_secs(1), r2.recv()).await.unwrap().unwrap();
113+
114+
assert_eq!(val1, 1);
115+
assert_eq!(val2, 1);
116+
}
117+
}

packages/events/src/bus.rs

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,101 @@
11
use std::sync::Arc;
22

3-
use tokio::sync::broadcast::Receiver;
4-
53
use crate::broadcaster::Broadcaster;
6-
use crate::sender;
4+
use crate::{receiver, sender};
75

8-
pub struct EventBus<E: Sync + Send + Clone + 'static> {
6+
#[derive(Clone, Debug)]
7+
pub struct EventBus<Event: Sync + Send + Clone + 'static> {
98
pub enable_sender: bool,
10-
pub broadcaster: Broadcaster<E>,
9+
pub broadcaster: Broadcaster<Event>,
1110
}
1211

13-
impl<E: Sync + Send + Clone + 'static> Default for EventBus<E> {
12+
impl<Event: Sync + Send + Clone + 'static> Default for EventBus<Event> {
1413
fn default() -> Self {
1514
let enable_sender = true;
16-
let broadcaster = Broadcaster::<E>::default();
15+
let broadcaster = Broadcaster::<Event>::default();
1716

1817
Self::new(enable_sender, broadcaster)
1918
}
2019
}
2120

22-
impl<E: Sync + Send + Clone + 'static> EventBus<E> {
21+
impl<Event: Sync + Send + Clone + 'static> EventBus<Event> {
2322
#[must_use]
24-
pub fn new(enable_sender: bool, broadcaster: Broadcaster<E>) -> Self {
23+
pub fn new(enable_sender: bool, broadcaster: Broadcaster<Event>) -> Self {
2524
Self {
2625
enable_sender,
2726
broadcaster,
2827
}
2928
}
3029

3130
#[must_use]
32-
pub fn sender(&self) -> Arc<Option<Box<dyn sender::Sender<Event = E>>>> {
31+
pub fn sender(&self) -> Option<Arc<dyn sender::Sender<Event = Event>>> {
3332
if self.enable_sender {
34-
Arc::new(Some(Box::new(self.broadcaster.clone())))
33+
Some(Arc::new(self.broadcaster.clone()))
3534
} else {
36-
Arc::new(None)
35+
None
3736
}
3837
}
3938

4039
#[must_use]
41-
pub fn receiver(&self) -> Receiver<E> {
42-
self.broadcaster.subscribe()
40+
pub fn receiver(&self) -> Box<dyn receiver::Receiver<Event = Event>> {
41+
Box::new(self.broadcaster.subscribe())
42+
}
43+
}
44+
45+
#[cfg(test)]
46+
mod tests {
47+
use tokio::time::{timeout, Duration};
48+
49+
use super::*;
50+
51+
#[tokio::test]
52+
async fn it_should_provide_an_event_sender_when_enabled() {
53+
let bus = EventBus::<String>::new(true, Broadcaster::default());
54+
55+
assert!(bus.sender().is_some());
56+
}
57+
58+
#[tokio::test]
59+
async fn it_should_not_provide_event_sender_when_disabled() {
60+
let bus = EventBus::<String>::new(false, Broadcaster::default());
61+
62+
assert!(bus.sender().is_none());
63+
}
64+
65+
#[tokio::test]
66+
async fn it_should_enabled_by_default() {
67+
let bus = EventBus::<String>::default();
68+
69+
assert!(bus.sender().is_some());
70+
}
71+
72+
#[tokio::test]
73+
async fn it_should_allow_sending_events_that_are_received_by_receivers() {
74+
let bus = EventBus::<String>::default();
75+
let sender = bus.sender().unwrap();
76+
let mut receiver = bus.receiver();
77+
78+
let event = "hello".to_string();
79+
80+
let _unused = sender.send(event.clone()).await.unwrap().unwrap();
81+
82+
let result = timeout(Duration::from_secs(1), receiver.recv()).await;
83+
84+
assert_eq!(result.unwrap().unwrap(), event);
85+
}
86+
87+
#[tokio::test]
88+
async fn it_should_send_a_closed_events_to_receivers_when_sender_is_dropped() {
89+
let bus = EventBus::<String>::default();
90+
91+
let mut receiver = bus.receiver();
92+
93+
let future = receiver.recv();
94+
95+
drop(bus); // explicitly drop sender
96+
97+
let result = timeout(Duration::from_secs(1), future).await;
98+
99+
assert!(matches!(result.unwrap(), Err(crate::receiver::RecvError::Closed)));
43100
}
44101
}

packages/events/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod broadcaster;
22
pub mod bus;
3+
pub mod receiver;
34
pub mod sender;
45

56
/// Target for tracing crate logs.

packages/events/src/receiver.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use std::fmt;
2+
3+
use futures::future::BoxFuture;
4+
#[cfg(test)]
5+
use mockall::{automock, predicate::str};
6+
7+
/// A trait for receiving events.
8+
#[cfg_attr(test, automock(type Event=();))]
9+
pub trait Receiver: Sync + Send {
10+
type Event: Send + Clone;
11+
12+
fn recv(&mut self) -> BoxFuture<'_, Result<Self::Event, RecvError>>;
13+
}
14+
15+
/// An error returned from the [`recv`] function on a [`Receiver`].
16+
#[derive(Debug, PartialEq, Eq, Clone)]
17+
pub enum RecvError {
18+
/// There are no more active senders implying no further messages will ever
19+
/// be sent.
20+
Closed,
21+
22+
/// The receiver lagged too far behind. Attempting to receive again will
23+
/// return the oldest message still retained by the channel.
24+
///
25+
/// Includes the number of skipped messages.
26+
Lagged(u64),
27+
}
28+
29+
impl fmt::Display for RecvError {
30+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31+
match self {
32+
RecvError::Closed => write!(f, "channel closed"),
33+
RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
34+
}
35+
}
36+
}
37+
38+
impl std::error::Error for RecvError {}

packages/events/src/sender.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,38 @@
1+
use std::fmt;
2+
13
use futures::future::BoxFuture;
24
#[cfg(test)]
35
use mockall::{automock, predicate::str};
4-
use tokio::sync::broadcast::error::SendError;
5-
6-
/// Target for tracing crate logs.
7-
pub const EVENTS_TARGET: &str = "EVENTS";
86

97
/// A trait for sending events.
108
#[cfg_attr(test, automock(type Event=();))]
119
pub trait Sender: Sync + Send {
1210
type Event: Send + Clone;
1311

14-
fn send_event(&self, event: Self::Event) -> BoxFuture<'_, Option<Result<usize, SendError<Self::Event>>>>;
12+
/// Sends an event to all active receivers.
13+
///
14+
/// Returns a future that resolves to an `Option<Result<usize, SendError<Self::Event>>>`:
15+
///
16+
/// - `Some(Ok(n))` — the event was successfully sent to `n` receivers.
17+
/// - `Some(Err(e))` — an error occurred while sending the event.
18+
/// - `None` — the sender is inactive or disconnected, and the event was not sent.
19+
///
20+
/// The `Option` allows implementations to express cases where sending is not possible
21+
/// (e.g., when the sender is disabled or there are no active receivers).
22+
///
23+
/// The `usize` typically represents the number of receivers the message was delivered to,
24+
/// but its semantics may vary depending on the concrete implementation.
25+
fn send(&self, event: Self::Event) -> BoxFuture<'_, Option<Result<usize, SendError<Self::Event>>>>;
26+
}
27+
28+
/// Error returned by the [`send`] function on a [`Sender`].
29+
#[derive(Debug)]
30+
pub struct SendError<Event>(pub Event);
31+
32+
impl<Event> fmt::Display for SendError<Event> {
33+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34+
write!(f, "channel closed")
35+
}
1536
}
37+
38+
impl<Event: fmt::Debug> std::error::Error for SendError<Event> {}

packages/http-tracker-core/benches/helpers/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization;
2020
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
2121
use futures::future::BoxFuture;
2222
use mockall::mock;
23-
use tokio::sync::broadcast::error::SendError;
2423
use torrust_tracker_configuration::{Configuration, Core};
24+
use torrust_tracker_events::sender::SendError;
2525
use torrust_tracker_primitives::peer::Peer;
2626
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
2727
use torrust_tracker_test_helpers::configuration;
@@ -127,6 +127,6 @@ mock! {
127127
impl torrust_tracker_events::sender::Sender for HttpStatsEventSender {
128128
type Event = Event;
129129

130-
fn send_event(&self, event: Event) -> BoxFuture<'static,Option<Result<usize,SendError<Event> > > > ;
130+
fn send(&self, event: Event) -> BoxFuture<'static,Option<Result<usize,SendError<Event> > > > ;
131131
}
132132
}

0 commit comments

Comments
 (0)