Skip to content

Commit 4bb7a5a

Browse files
committed
Merge #1476: Refactor event listeners initialization
6d50a78 refactor: normalize container field names (Jose Celano) 74e174d refactor: [#1444] udp server event listener start in app start (Jose Celano) 2fa4e15 refactor: [#1444] udp core event listener start in app start (Jose Celano) 07c5858 refactor: [#1444] http core event listener start in app start. Step 4 (Jose Celano) 2d9af45 chore: [#1444] event listener has to be run manually on tests (Jose Celano) 19bb37d refactor: [#1444] renaem variables (Jose Celano) 6d49a13 refactor: [#1444] rename fields (Jose Celano) 5906037 refactor: [#1444] http core event listener start in app start. Step 3 (Jose Celano) b2cf5d9 refactor: [#1444] rename variable (Jose Celano) 07d1314 refactor: [#1444] http core event listener start in app start. Step 2 (Jose Celano) 17fb909 refactor: [#1444] http core event listener start in app start. Step 1 (Jose Celano) Pull request description: Move the spawning of new tasks for event listeners from app container instantiation to job execution. This will enable us to easily add more event listeners in the future. Additionally, it enhances the way the tracker handles spawned tasks. We should have control over all JoinHandles for all tasks. This promotes a cleaner task lifecycle. Some good practices: - There should always be an owner, responsible for the spawned tasks. No orphan tasks. - It should be possible to gracefully shut down the task. - The state of the task should be known. - Sometimes the owner needs to retrieve data (for example, status) from the task (when it starts or while it's active) or send the task additional commands. ### Subtasks - [x] HTTP Tracker Core event listener - [x] Step 1: Separate factory from spawning task in the current `setup::factory` function. - [x] Step 2: Split factory function into factory and running listener. - [x] Step 3: Add `Keeper` to `HttpTrackerCoreContainer` (so we can run the listener later). - [x] Step 4: Move task creation to app start. - [x] Step 5. Start event listener in test environment. - [x] UDP Tracker Core event listener (Step 1 to 4) - [x] UDP Tracker Server event listener (Step 1 to 4) NOTE: This PR will not include the refactor to allow multiple listeners. See #1385. ACKs for top commit: josecelano: ACK 6d50a78 Tree-SHA512: 7cc637227caf3fc2e86041a25b2e2b0cdf52819dc587b972494c54f3e8337c99472bd38fc0dc76ce4b9811161cdfdf4fb4d1c65fcedea0a90f77584a88f06994
2 parents 831f411 + 6d50a78 commit 4bb7a5a

File tree

41 files changed

+552
-354
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+552
-354
lines changed

packages/axum-http-tracker-server/src/environment.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use bittorrent_http_tracker_core::container::HttpTrackerCoreContainer;
44
use bittorrent_primitives::info_hash::InfoHash;
55
use bittorrent_tracker_core::container::TrackerCoreContainer;
66
use futures::executor::block_on;
7+
use tokio::task::JoinHandle;
78
use torrust_axum_server::tsl::make_rust_tls;
89
use torrust_server_lib::registar::Registar;
910
use torrust_tracker_configuration::{logging, Configuration};
@@ -17,6 +18,7 @@ pub struct Environment<S> {
1718
pub container: Arc<EnvContainer>,
1819
pub registar: Registar,
1920
pub server: HttpServer<S>,
21+
pub event_listener_job: Option<JoinHandle<()>>,
2022
}
2123

2224
impl<S> Environment<S> {
@@ -54,22 +56,32 @@ impl Environment<Stopped> {
5456
container,
5557
registar: Registar::default(),
5658
server,
59+
event_listener_job: None,
5760
}
5861
}
5962

63+
/// Starts the test environment and return a running environment.
64+
///
6065
/// # Panics
6166
///
6267
/// Will panic if the server fails to start.
6368
#[allow(dead_code)]
6469
pub async fn start(self) -> Environment<Running> {
70+
// Start the event listener
71+
let event_listener_job = self.container.http_tracker_core_container.stats_keeper.run_event_listener();
72+
73+
// Start the server
74+
let server = self
75+
.server
76+
.start(self.container.http_tracker_core_container.clone(), self.registar.give_form())
77+
.await
78+
.expect("Failed to start the HTTP tracker server");
79+
6580
Environment {
6681
container: self.container.clone(),
6782
registar: self.registar.clone(),
68-
server: self
69-
.server
70-
.start(self.container.http_tracker_core_container.clone(), self.registar.give_form())
71-
.await
72-
.unwrap(),
83+
server,
84+
event_listener_job: Some(event_listener_job),
7385
}
7486
}
7587
}
@@ -79,14 +91,27 @@ impl Environment<Running> {
7991
Environment::<Stopped>::new(configuration).start().await
8092
}
8193

94+
/// Stops the test environment and return a stopped environment.
95+
///
8296
/// # Panics
8397
///
8498
/// Will panic if the server fails to stop.
8599
pub async fn stop(self) -> Environment<Stopped> {
100+
// Stop the event listener
101+
if let Some(event_listener_job) = self.event_listener_job {
102+
// todo: send a message to the event listener to stop and wait for
103+
// it to finish
104+
event_listener_job.abort();
105+
}
106+
107+
// Stop the server
108+
let server = self.server.stop().await.expect("Failed to stop the HTTP tracker server");
109+
86110
Environment {
87111
container: self.container,
88112
registar: Registar::default(),
89-
server: self.server.stop().await.unwrap(),
113+
server,
114+
event_listener_job: None,
90115
}
91116
}
92117

packages/axum-http-tracker-server/src/server.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,15 @@ mod tests {
270270

271271
let http_tracker_config = Arc::new(http_tracker_config.clone());
272272

273-
// HTTP stats
274-
let (http_stats_event_sender, http_stats_repository) =
273+
// HTTP core stats
274+
let http_stats_keeper =
275275
bittorrent_http_tracker_core::statistics::setup::factory(configuration.core.tracker_usage_statistics);
276-
let http_stats_event_sender = Arc::new(http_stats_event_sender);
277-
let http_stats_repository = Arc::new(http_stats_repository);
276+
let http_stats_event_sender = http_stats_keeper.sender();
277+
let http_stats_repository = http_stats_keeper.repository();
278+
279+
if configuration.core.tracker_usage_statistics {
280+
let _unused = http_stats_keeper.run_event_listener();
281+
}
278282

279283
let tracker_core_container = Arc::new(TrackerCoreContainer::initialize(&core_config));
280284

@@ -296,8 +300,9 @@ mod tests {
296300
HttpTrackerCoreContainer {
297301
tracker_core_container,
298302
http_tracker_config,
299-
http_stats_event_sender,
300-
http_stats_repository,
303+
stats_keeper: http_stats_keeper,
304+
stats_event_sender: http_stats_event_sender,
305+
stats_repository: http_stats_repository,
301306
announce_service,
302307
scrape_service,
303308
}

packages/axum-http-tracker-server/src/v1/handlers/announce.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,14 @@ mod tests {
160160
&db_torrent_repository,
161161
));
162162

163-
// HTTP stats
164-
let (http_stats_event_sender, http_stats_repository) =
165-
bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
166-
let http_stats_event_sender = Arc::new(http_stats_event_sender);
167-
let _http_stats_repository = Arc::new(http_stats_repository);
163+
// HTTP core stats
164+
let http_stats_keeper = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
165+
let http_stats_event_sender = http_stats_keeper.sender();
166+
let _http_stats_repository = http_stats_keeper.repository();
167+
168+
if config.core.tracker_usage_statistics {
169+
let _unused = http_stats_keeper.run_event_listener();
170+
}
168171

169172
let announce_service = Arc::new(AnnounceService::new(
170173
core_config.clone(),

packages/axum-http-tracker-server/src/v1/handlers/scrape.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,14 @@ mod tests {
131131
let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default());
132132
let scrape_handler = Arc::new(ScrapeHandler::new(&whitelist_authorization, &in_memory_torrent_repository));
133133

134-
// HTTP stats
135-
let (http_stats_event_sender, _http_stats_repository) =
136-
bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
137-
let http_stats_event_sender = Arc::new(http_stats_event_sender);
134+
// HTTP core stats
135+
let http_stats_keeper = bittorrent_http_tracker_core::statistics::setup::factory(config.core.tracker_usage_statistics);
136+
let http_stats_event_sender = http_stats_keeper.sender();
137+
let _http_stats_repository = http_stats_keeper.repository();
138+
139+
if config.core.tracker_usage_statistics {
140+
let _unused = http_stats_keeper.run_event_listener();
141+
}
138142

139143
(
140144
CoreTrackerServices {

packages/axum-http-tracker-server/tests/server/v1/contract.rs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -676,12 +676,7 @@ mod for_all_config_modes {
676676
.announce(&QueryBuilder::default().query())
677677
.await;
678678

679-
let stats = env
680-
.container
681-
.http_tracker_core_container
682-
.http_stats_repository
683-
.get_stats()
684-
.await;
679+
let stats = env.container.http_tracker_core_container.stats_repository.get_stats().await;
685680

686681
assert_eq!(stats.tcp4_announces_handled, 1);
687682

@@ -707,12 +702,7 @@ mod for_all_config_modes {
707702
.announce(&QueryBuilder::default().query())
708703
.await;
709704

710-
let stats = env
711-
.container
712-
.http_tracker_core_container
713-
.http_stats_repository
714-
.get_stats()
715-
.await;
705+
let stats = env.container.http_tracker_core_container.stats_repository.get_stats().await;
716706

717707
assert_eq!(stats.tcp6_announces_handled, 1);
718708

@@ -737,12 +727,7 @@ mod for_all_config_modes {
737727
)
738728
.await;
739729

740-
let stats = env
741-
.container
742-
.http_tracker_core_container
743-
.http_stats_repository
744-
.get_stats()
745-
.await;
730+
let stats = env.container.http_tracker_core_container.stats_repository.get_stats().await;
746731

747732
assert_eq!(stats.tcp6_announces_handled, 0);
748733

@@ -1130,12 +1115,7 @@ mod for_all_config_modes {
11301115
)
11311116
.await;
11321117

1133-
let stats = env
1134-
.container
1135-
.http_tracker_core_container
1136-
.http_stats_repository
1137-
.get_stats()
1138-
.await;
1118+
let stats = env.container.http_tracker_core_container.stats_repository.get_stats().await;
11391119

11401120
assert_eq!(stats.tcp4_scrapes_handled, 1);
11411121

@@ -1167,12 +1147,7 @@ mod for_all_config_modes {
11671147
)
11681148
.await;
11691149

1170-
let stats = env
1171-
.container
1172-
.http_tracker_core_container
1173-
.http_stats_repository
1174-
.get_stats()
1175-
.await;
1150+
let stats = env.container.http_tracker_core_container.stats_repository.get_stats().await;
11761151

11771152
assert_eq!(stats.tcp6_scrapes_handled, 1);
11781153

packages/http-tracker-core/benches/helpers/util.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
22
use std::sync::Arc;
33

44
use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId};
5+
use bittorrent_http_tracker_core::event::Event;
6+
use bittorrent_http_tracker_core::{event, statistics};
57
use bittorrent_http_tracker_protocol::v1::requests::announce::Announce;
68
use bittorrent_http_tracker_protocol::v1::services::peer_ip_resolver::ClientIpSources;
79
use bittorrent_primitives::info_hash::InfoHash;
@@ -13,6 +15,9 @@ use bittorrent_tracker_core::torrent::repository::in_memory::InMemoryTorrentRepo
1315
use bittorrent_tracker_core::torrent::repository::persisted::DatabasePersistentTorrentRepository;
1416
use bittorrent_tracker_core::whitelist::authorization::WhitelistAuthorization;
1517
use bittorrent_tracker_core::whitelist::repository::in_memory::InMemoryWhitelist;
18+
use futures::future::BoxFuture;
19+
use mockall::mock;
20+
use tokio::sync::broadcast::error::SendError;
1621
use torrust_tracker_configuration::{Configuration, Core};
1722
use torrust_tracker_primitives::peer::Peer;
1823
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
@@ -50,10 +55,14 @@ pub fn initialize_core_tracker_services_with_config(config: &Configuration) -> (
5055
&db_torrent_repository,
5156
));
5257

53-
// HTTP stats
54-
let (http_stats_event_sender, http_stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
55-
let http_stats_event_sender = Arc::new(http_stats_event_sender);
56-
let _http_stats_repository = Arc::new(http_stats_repository);
58+
// HTTP core stats
59+
let http_stats_keeper = statistics::setup::factory(config.core.tracker_usage_statistics);
60+
let http_stats_event_sender = http_stats_keeper.sender();
61+
let _http_stats_repository = http_stats_keeper.repository();
62+
63+
if config.core.tracker_usage_statistics {
64+
let _unused = http_stats_keeper.run_event_listener();
65+
}
5766

5867
(
5968
CoreTrackerServices {
@@ -105,12 +114,6 @@ pub fn sample_info_hash() -> InfoHash {
105114
.expect("String should be a valid info hash")
106115
}
107116

108-
use bittorrent_http_tracker_core::event::Event;
109-
use bittorrent_http_tracker_core::{event, statistics};
110-
use futures::future::BoxFuture;
111-
use mockall::mock;
112-
use tokio::sync::broadcast::error::SendError;
113-
114117
mock! {
115118
HttpStatsEventSender {}
116119
impl event::sender::Sender for HttpStatsEventSender {

packages/http-tracker-core/src/container.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ pub struct HttpTrackerCoreContainer {
1313
pub tracker_core_container: Arc<TrackerCoreContainer>,
1414

1515
// `HttpTrackerCoreServices`
16-
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
17-
pub http_stats_repository: Arc<statistics::repository::Repository>,
16+
pub stats_keeper: Arc<statistics::keeper::Keeper>,
17+
pub stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
18+
pub stats_repository: Arc<statistics::repository::Repository>,
1819
pub announce_service: Arc<AnnounceService>,
1920
pub scrape_service: Arc<ScrapeService>,
2021
}
@@ -44,35 +45,39 @@ impl HttpTrackerCoreContainer {
4445
Arc::new(Self {
4546
tracker_core_container: tracker_core_container.clone(),
4647
http_tracker_config: http_tracker_config.clone(),
47-
http_stats_event_sender: http_tracker_core_services.http_stats_event_sender.clone(),
48-
http_stats_repository: http_tracker_core_services.http_stats_repository.clone(),
49-
announce_service: http_tracker_core_services.http_announce_service.clone(),
50-
scrape_service: http_tracker_core_services.http_scrape_service.clone(),
48+
stats_keeper: http_tracker_core_services.stats_keeper.clone(),
49+
stats_event_sender: http_tracker_core_services.stats_event_sender.clone(),
50+
stats_repository: http_tracker_core_services.stats_repository.clone(),
51+
announce_service: http_tracker_core_services.announce_service.clone(),
52+
scrape_service: http_tracker_core_services.scrape_service.clone(),
5153
})
5254
}
5355
}
5456

5557
pub struct HttpTrackerCoreServices {
56-
pub http_stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
57-
pub http_stats_repository: Arc<statistics::repository::Repository>,
58-
pub http_announce_service: Arc<services::announce::AnnounceService>,
59-
pub http_scrape_service: Arc<services::scrape::ScrapeService>,
58+
pub stats_keeper: Arc<statistics::keeper::Keeper>,
59+
pub stats_event_sender: Arc<Option<Box<dyn event::sender::Sender>>>,
60+
pub stats_repository: Arc<statistics::repository::Repository>,
61+
pub announce_service: Arc<services::announce::AnnounceService>,
62+
pub scrape_service: Arc<services::scrape::ScrapeService>,
6063
}
6164

6265
impl HttpTrackerCoreServices {
6366
#[must_use]
6467
pub fn initialize_from(tracker_core_container: &Arc<TrackerCoreContainer>) -> Arc<Self> {
65-
let (http_stats_event_sender, http_stats_repository) =
66-
statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
67-
let http_stats_event_sender = Arc::new(http_stats_event_sender);
68-
let http_stats_repository = Arc::new(http_stats_repository);
68+
// HTTP core stats
69+
let http_stats_keeper = statistics::setup::factory(tracker_core_container.core_config.tracker_usage_statistics);
70+
let http_stats_event_sender = http_stats_keeper.sender();
71+
let http_stats_repository = http_stats_keeper.repository();
72+
6973
let http_announce_service = Arc::new(AnnounceService::new(
7074
tracker_core_container.core_config.clone(),
7175
tracker_core_container.announce_handler.clone(),
7276
tracker_core_container.authentication_service.clone(),
7377
tracker_core_container.whitelist_authorization.clone(),
7478
http_stats_event_sender.clone(),
7579
));
80+
7681
let http_scrape_service = Arc::new(ScrapeService::new(
7782
tracker_core_container.core_config.clone(),
7883
tracker_core_container.scrape_handler.clone(),
@@ -81,10 +86,11 @@ impl HttpTrackerCoreServices {
8186
));
8287

8388
Arc::new(Self {
84-
http_stats_event_sender,
85-
http_stats_repository,
86-
http_announce_service,
87-
http_scrape_service,
89+
stats_keeper: http_stats_keeper,
90+
stats_event_sender: http_stats_event_sender,
91+
stats_repository: http_stats_repository,
92+
announce_service: http_announce_service,
93+
scrape_service: http_scrape_service,
8894
})
8995
}
9096
}

packages/http-tracker-core/src/event/sender.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub trait Sender: Sync + Send {
1616
}
1717

1818
/// An event sender implementation using a broadcast channel.
19+
#[derive(Clone)]
1920
pub struct Broadcaster {
2021
pub(crate) sender: broadcast::Sender<Event>,
2122
}

packages/http-tracker-core/src/services/announce.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,14 @@ mod tests {
252252
&db_torrent_repository,
253253
));
254254

255-
// HTTP stats
256-
let (http_stats_event_sender, http_stats_repository) = statistics::setup::factory(config.core.tracker_usage_statistics);
257-
let http_stats_event_sender = Arc::new(http_stats_event_sender);
258-
let _http_stats_repository = Arc::new(http_stats_repository);
255+
// HTTP core stats
256+
let http_stats_keeper = statistics::setup::factory(config.core.tracker_usage_statistics);
257+
let http_stats_event_sender = http_stats_keeper.sender();
258+
let _http_stats_repository = http_stats_keeper.repository();
259+
260+
if config.core.tracker_usage_statistics {
261+
let _unused = http_stats_keeper.run_event_listener();
262+
}
259263

260264
(
261265
CoreTrackerServices {

0 commit comments

Comments
 (0)