Skip to content

Commit 419f790

Browse files
committed
Merge #1479: Overhaul stats: Extract events package (part 1)
b0951aa refactor: [#1478] move EventBus to event mod (Jose Celano) b32072c refactor: [#1478] rename struct fields for Keeper type (Jose Celano) c103b60 refactor: [#1478] rename Keeper variables (Jose Celano) 8a9314b refactor: [#1478] rename Keeper to EventBus (Jose Celano) a660be8 refactor: [#1478] inline factory fn in udp server stats (Jose Celano) 8e8b1dd refactor: [#1478] inline factory fn in udp core stats (Jose Celano) cc7ead8 refactor: [#1478] inline factory fn in http core stats (Jose Celano) a055ab9 refactor: [#1478] inline keeper_factory fn (Jose Celano) 5f38357 refactor: [#1478] decouple events from stats in udp server keeper (Jose Celano) f9f13a4 refactor: [#1478] decouple events from stats in udp core keeper (Jose Celano) f25438a refactor: [#1478] decouple events from stats in http core keeper (Jose Celano) Pull request description: Overhaul stats: Extract events package. ### Subtasks - [x] Decouple events from stats in the http core keeper. - [x] Decouple events from stats in the udp core keeper. - [x] Decouple events from stats in the udp server keeper. - [x] Inline function `statistics::setup::keeper_factory` and `statistics::setup::factory`. No need for them anymore. - [x] Rename all `Keeper` types to `EventBus`. - [x] Move `EventBus` to `event` module. ### Implement in part 2 of the PR - Create new package `events`. - Create a generic `EventBus` in the `events` package and replace concrete versions in packages. More info in #1480. ACKs for top commit: josecelano: ACK b0951aa Tree-SHA512: 48337ed46d234f2d6c77df514ce55b8f2abbc0f3191ce3a818d88917eba95f3bd03291f4690501e5adc969bb260167fc968eea0a3790d43350083ece5cbe0d12
2 parents 4bb7a5a + b0951aa commit 419f790

File tree

40 files changed

+444
-559
lines changed

40 files changed

+444
-559
lines changed

packages/axum-http-tracker-server/src/environment.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::sync::Arc;
22

33
use bittorrent_http_tracker_core::container::HttpTrackerCoreContainer;
4+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
45
use bittorrent_primitives::info_hash::InfoHash;
56
use bittorrent_tracker_core::container::TrackerCoreContainer;
67
use futures::executor::block_on;
@@ -68,7 +69,10 @@ impl Environment<Stopped> {
6869
#[allow(dead_code)]
6970
pub async fn start(self) -> Environment<Running> {
7071
// Start the event listener
71-
let event_listener_job = self.container.http_tracker_core_container.stats_keeper.run_event_listener();
72+
let event_listener_job = run_event_listener(
73+
self.container.http_tracker_core_container.event_bus.receiver(),
74+
&self.container.http_tracker_core_container.stats_repository,
75+
);
7276

7377
// Start the server
7478
let server = self

packages/axum-http-tracker-server/src/server.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,12 @@ mod tests {
248248
use std::sync::Arc;
249249

250250
use bittorrent_http_tracker_core::container::HttpTrackerCoreContainer;
251+
use bittorrent_http_tracker_core::event::bus::EventBus;
252+
use bittorrent_http_tracker_core::event::sender::Broadcaster;
251253
use bittorrent_http_tracker_core::services::announce::AnnounceService;
252254
use bittorrent_http_tracker_core::services::scrape::ScrapeService;
255+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
256+
use bittorrent_http_tracker_core::statistics::repository::Repository;
253257
use bittorrent_tracker_core::container::TrackerCoreContainer;
254258
use torrust_axum_server::tsl::make_rust_tls;
255259
use torrust_server_lib::registar::Registar;
@@ -271,13 +275,17 @@ mod tests {
271275
let http_tracker_config = Arc::new(http_tracker_config.clone());
272276

273277
// HTTP core stats
274-
let http_stats_keeper =
275-
bittorrent_http_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics);
276-
let http_stats_event_sender = http_stats_keeper.sender();
277-
let http_stats_repository = http_stats_keeper.repository();
278+
let http_core_broadcaster = Broadcaster::default();
279+
let http_stats_repository = Arc::new(Repository::new());
280+
let http_stats_event_bus = Arc::new(EventBus::new(
281+
configuration.core.tracker_usage_statistics,
282+
http_core_broadcaster.clone(),
283+
));
284+
285+
let http_stats_event_sender = http_stats_event_bus.sender();
278286

279287
if configuration.core.tracker_usage_statistics {
280-
let _unused = http_stats_keeper.run_event_listener();
288+
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
281289
}
282290

283291
let tracker_core_container = Arc::new(TrackerCoreContainer::initialize(&core_config));
@@ -300,7 +308,7 @@ mod tests {
300308
HttpTrackerCoreContainer {
301309
tracker_core_container,
302310
http_tracker_config,
303-
stats_keeper: http_stats_keeper,
311+
event_bus: http_stats_event_bus,
304312
stats_event_sender: http_stats_event_sender,
305313
stats_repository: http_stats_repository,
306314
announce_service,

packages/axum-http-tracker-server/src/v1/handlers/announce.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ mod tests {
107107
use std::sync::Arc;
108108

109109
use aquatic_udp_protocol::PeerId;
110+
use bittorrent_http_tracker_core::event::bus::EventBus;
111+
use bittorrent_http_tracker_core::event::sender::Broadcaster;
110112
use bittorrent_http_tracker_core::services::announce::AnnounceService;
113+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
114+
use bittorrent_http_tracker_core::statistics::repository::Repository;
111115
use bittorrent_http_tracker_protocol::v1::requests::announce::Announce;
112116
use bittorrent_http_tracker_protocol::v1::responses;
113117
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
@@ -161,12 +165,17 @@ mod tests {
161165
));
162166

163167
// HTTP core stats
164-
let http_stats_keeper = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
165-
let http_stats_event_sender = http_stats_keeper.sender();
166-
let _http_stats_repository = http_stats_keeper.repository();
168+
let http_core_broadcaster = Broadcaster::default();
169+
let http_stats_repository = Arc::new(Repository::new());
170+
let http_stats_event_bus = Arc::new(EventBus::new(
171+
config.core.tracker_usage_statistics,
172+
http_core_broadcaster.clone(),
173+
));
174+
175+
let http_stats_event_sender = http_stats_event_bus.sender();
167176

168177
if config.core.tracker_usage_statistics {
169-
let _unused = http_stats_keeper.run_event_listener();
178+
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
170179
}
171180

172181
let announce_service = Arc::new(AnnounceService::new(

packages/axum-http-tracker-server/src/v1/handlers/scrape.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ mod tests {
8383
use std::str::FromStr;
8484
use std::sync::Arc;
8585

86+
use bittorrent_http_tracker_core::event::bus::EventBus;
87+
use bittorrent_http_tracker_core::event::sender::Broadcaster;
88+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
89+
use bittorrent_http_tracker_core::statistics::repository::Repository;
8690
use bittorrent_http_tracker_protocol::v1::requests::scrape::Scrape;
8791
use bittorrent_http_tracker_protocol::v1::responses;
8892
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
@@ -132,12 +136,17 @@ mod tests {
132136
let scrape_handler = Arc::new(ScrapeHandler::new(&whitelist_authorization, &in_memory_torrent_repository));
133137

134138
// HTTP core stats
135-
let http_stats_keeper = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
136-
let http_stats_event_sender = http_stats_keeper.sender();
137-
let _http_stats_repository = http_stats_keeper.repository();
139+
let http_core_broadcaster = Broadcaster::default();
140+
let http_stats_repository = Arc::new(Repository::new());
141+
let http_stats_event_bus = Arc::new(EventBus::new(
142+
config.core.tracker_usage_statistics,
143+
http_core_broadcaster.clone(),
144+
));
145+
146+
let http_stats_event_sender = http_stats_event_bus.sender();
138147

139148
if config.core.tracker_usage_statistics {
140-
let _unused = http_stats_keeper.run_event_listener();
149+
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
141150
}
142151

143152
(

packages/http-tracker-core/benches/helpers/util.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
22
use std::sync::Arc;
33

44
use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId};
5+
use bittorrent_http_tracker_core::event;
6+
use bittorrent_http_tracker_core::event::bus::EventBus;
7+
use bittorrent_http_tracker_core::event::sender::Broadcaster;
58
use bittorrent_http_tracker_core::event::Event;
6-
use bittorrent_http_tracker_core::{event, statistics};
9+
use bittorrent_http_tracker_core::statistics::event::listener::run_event_listener;
10+
use bittorrent_http_tracker_core::statistics::repository::Repository;
711
use bittorrent_http_tracker_protocol::v1::requests::announce::Announce;
812
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
913
use bittorrent_primitives::info_hash::InfoHash;
@@ -56,12 +60,17 @@ pub fn initialize_core_tracker_services_with_config(config: &Configuration) -> (
5660
));
5761

5862
// HTTP core stats
59-
let http_stats_keeper = statistics::setup::factory(config.core.tracker_usage_statistics);
60-
let http_stats_event_sender = http_stats_keeper.sender();
61-
let _http_stats_repository = http_stats_keeper.repository();
63+
let http_core_broadcaster = Broadcaster::default();
64+
let http_stats_repository = Arc::new(Repository::new());
65+
let http_stats_event_bus = Arc::new(EventBus::new(
66+
config.core.tracker_usage_statistics,
67+
http_core_broadcaster.clone(),
68+
));
69+
70+
let http_stats_event_sender = http_stats_event_bus.sender();
6271

6372
if config.core.tracker_usage_statistics {
64-
let _unused = http_stats_keeper.run_event_listener();
73+
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
6574
}
6675

6776
(

packages/http-tracker-core/src/container.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ use std::sync::Arc;
33
use bittorrent_tracker_core::container::TrackerCoreContainer;
44
use torrust_tracker_configuration::{Core, HttpTracker};
55

6+
use crate::event::bus::EventBus;
7+
use crate::event::sender::Broadcaster;
68
use crate::services::announce::AnnounceService;
79
use crate::services::scrape::ScrapeService;
10+
use crate::statistics::repository::Repository;
811
use crate::{event, services, statistics};
912

1013
pub struct HttpTrackerCoreContainer {
@@ -13,7 +16,7 @@ pub struct HttpTrackerCoreContainer {
1316
pub tracker_core_container: Arc<TrackerCoreContainer>,
1417

1518
// `HttpTrackerCoreServices`
16-
pub stats_keeper: Arc<statistics::keeper::Keeper>,
19+
pub event_bus: Arc<event::bus::EventBus>,
1720
pub stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
1821
pub stats_repository: Arc<statistics::repository::Repository>,
1922
pub announce_service: Arc<AnnounceService>,
@@ -45,7 +48,7 @@ impl HttpTrackerCoreContainer {
4548
Arc::new(Self {
4649
tracker_core_container: tracker_core_container.clone(),
4750
http_tracker_config: http_tracker_config.clone(),
48-
stats_keeper: http_tracker_core_services.stats_keeper.clone(),
51+
event_bus: http_tracker_core_services.event_bus.clone(),
4952
stats_event_sender: http_tracker_core_services.stats_event_sender.clone(),
5053
stats_repository: http_tracker_core_services.stats_repository.clone(),
5154
announce_service: http_tracker_core_services.announce_service.clone(),
@@ -55,7 +58,7 @@ impl HttpTrackerCoreContainer {
5558
}
5659

5760
pub struct HttpTrackerCoreServices {
58-
pub stats_keeper: Arc<statistics::keeper::Keeper>,
61+
pub event_bus: Arc<event::bus::EventBus>,
5962
pub stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
6063
pub stats_repository: Arc<statistics::repository::Repository>,
6164
pub announce_service: Arc<services::announce::AnnounceService>,
@@ -66,9 +69,14 @@ impl HttpTrackerCoreServices {
6669
#[must_use]
6770
pub fn initialize_from(tracker_core_container: &Arc<TrackerCoreContainer>) -> Arc<Self> {
6871
// HTTP core stats
69-
let http_stats_keeper = statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
70-
let http_stats_event_sender = http_stats_keeper.sender();
71-
let http_stats_repository = http_stats_keeper.repository();
72+
let http_core_broadcaster = Broadcaster::default();
73+
let http_stats_repository = Arc::new(Repository::new());
74+
let http_stats_event_bus = Arc::new(EventBus::new(
75+
tracker_core_container.core_config.tracker_usage_statistics,
76+
http_core_broadcaster.clone(),
77+
));
78+
79+
let http_stats_event_sender = http_stats_event_bus.sender();
7280

7381
let http_announce_service = Arc::new(AnnounceService::new(
7482
tracker_core_container.core_config.clone(),
@@ -86,7 +94,7 @@ impl HttpTrackerCoreServices {
8694
));
8795

8896
Arc::new(Self {
89-
stats_keeper: http_stats_keeper,
97+
event_bus: http_stats_event_bus,
9098
stats_event_sender: http_stats_event_sender,
9199
stats_repository: http_stats_repository,
92100
announce_service: http_announce_service,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use std::sync::Arc;
2+
3+
use tokio::sync::broadcast::Receiver;
4+
5+
use crate::event::sender::{self, Broadcaster};
6+
use crate::event::Event;
7+
8+
pub struct EventBus {
9+
pub enable_sender: bool,
10+
pub broadcaster: Broadcaster,
11+
}
12+
13+
impl Default for EventBus {
14+
fn default() -> Self {
15+
let enable_sender = true;
16+
let broadcaster = Broadcaster::default();
17+
18+
Self::new(enable_sender, broadcaster)
19+
}
20+
}
21+
22+
impl EventBus {
23+
#[must_use]
24+
pub fn new(enable_sender: bool, broadcaster: Broadcaster) -> Self {
25+
Self {
26+
enable_sender,
27+
broadcaster,
28+
}
29+
}
30+
31+
#[must_use]
32+
pub fn sender(&self) -> Arc<Option<Box<dyn sender::Sender>>> {
33+
if self.enable_sender {
34+
Arc::new(Some(Box::new(self.broadcaster.clone())))
35+
} else {
36+
Arc::new(None)
37+
}
38+
}
39+
40+
#[must_use]
41+
pub fn receiver(&self) -> Receiver<Event> {
42+
self.broadcaster.subscribe()
43+
}
44+
}

packages/http-tracker-core/src/event/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
pub mod bus;
2+
pub mod sender;
3+
14
use std::net::{IpAddr, SocketAddr};
25

36
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::RemoteClientAddr;
@@ -7,8 +10,6 @@ use torrust_tracker_metrics::label_name;
710
use torrust_tracker_primitives::peer::PeerAnnouncement;
811
use torrust_tracker_primitives::service_binding::ServiceBinding;
912

10-
pub mod sender;
11-
1213
/// A HTTP core event.
1314
#[derive(Debug, PartialEq, Eq, Clone)]
1415
pub enum Event {

packages/http-tracker-core/src/services/announce.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,17 @@ mod tests {
253253
));
254254

255255
// HTTP core stats
256-
let http_stats_keeper = statistics::setup::factory(config.core.tracker_usage_statistics);
257-
let http_stats_event_sender = http_stats_keeper.sender();
258-
let _http_stats_repository = http_stats_keeper.repository();
256+
let http_core_broadcaster = Broadcaster::default();
257+
let http_stats_repository = Arc::new(Repository::new());
258+
let http_stats_event_bus = Arc::new(EventBus::new(
259+
config.core.tracker_usage_statistics,
260+
http_core_broadcaster.clone(),
261+
));
262+
263+
let http_stats_event_sender = http_stats_event_bus.sender();
259264

260265
if config.core.tracker_usage_statistics {
261-
let _unused = http_stats_keeper.run_event_listener();
266+
let _unused = run_event_listener(http_stats_event_bus.receiver(), &http_stats_repository);
262267
}
263268

264269
(
@@ -297,9 +302,13 @@ mod tests {
297302
use mockall::mock;
298303
use tokio::sync::broadcast::error::SendError;
299304

305+
use crate::event;
306+
use crate::event::bus::EventBus;
307+
use crate::event::sender::Broadcaster;
300308
use crate::event::Event;
309+
use crate::statistics::event::listener::run_event_listener;
310+
use crate::statistics::repository::Repository;
301311
use crate::tests::sample_info_hash;
302-
use crate::{event, statistics};
303312

304313
mock! {
305314
HttpStatsEventSender {}

packages/http-tracker-core/src/services/scrape.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,23 +259,26 @@ mod tests {
259259
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
260260
use torrust_tracker_test_helpers::configuration;
261261

262+
use crate::event;
263+
use crate::event::bus::EventBus;
264+
use crate::event::sender::Broadcaster;
262265
use crate::event::{ConnectionContext, Event};
263266
use crate::services::scrape::tests::{
264267
initialize_services_with_configuration, sample_info_hashes, sample_peer, MockHttpStatsEventSender,
265268
};
266269
use crate::services::scrape::ScrapeService;
267270
use crate::tests::sample_info_hash;
268-
use crate::{event, statistics};
269271

270272
#[tokio::test]
271273
async fn it_should_return_the_scrape_data_for_a_torrent() {
272274
let configuration = configuration::ephemeral_public();
273275
let core_config = Arc::new(configuration.core.clone());
274276

275277
// HTTP core stats
276-
let http_stats_keeper = statistics::setup::factory(false);
277-
let http_stats_event_sender = http_stats_keeper.sender();
278-
let _http_stats_repository = http_stats_keeper.repository();
278+
let http_core_broadcaster = Broadcaster::default();
279+
let http_stats_event_bus = Arc::new(EventBus::new(false, http_core_broadcaster.clone()));
280+
281+
let http_stats_event_sender = http_stats_event_bus.sender();
279282

280283
let container = initialize_services_with_configuration(&configuration);
281284

@@ -449,13 +452,15 @@ mod tests {
449452
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};
450453
use torrust_tracker_test_helpers::configuration;
451454

455+
use crate::event;
456+
use crate::event::bus::EventBus;
457+
use crate::event::sender::Broadcaster;
452458
use crate::event::{ConnectionContext, Event};
453459
use crate::services::scrape::tests::{
454460
initialize_services_with_configuration, sample_info_hashes, sample_peer, MockHttpStatsEventSender,
455461
};
456462
use crate::services::scrape::ScrapeService;
457463
use crate::tests::sample_info_hash;
458-
use crate::{event, statistics};
459464

460465
#[tokio::test]
461466
async fn it_should_return_the_zeroed_scrape_data_when_the_tracker_is_running_in_private_mode_and_the_peer_is_not_authenticated(
@@ -465,9 +470,10 @@ mod tests {
465470
let container = initialize_services_with_configuration(&config);
466471

467472
// HTTP core stats
468-
let http_stats_keeper = statistics::setup::factory(false);
469-
let http_stats_event_sender = http_stats_keeper.sender();
470-
let _http_stats_repository = http_stats_keeper.repository();
473+
let http_core_broadcaster = Broadcaster::default();
474+
let http_stats_event_bus = Arc::new(EventBus::new(false, http_core_broadcaster.clone()));
475+
476+
let http_stats_event_sender = http_stats_event_bus.sender();
471477

472478
let info_hash = sample_info_hash();
473479
let info_hashes = vec![info_hash];

0 commit comments

Comments
 (0)