Skip to content

Commit 04de1df

Browse files
committed
Merge #1567: Overhaul stats: Decouple stats and ban service event listeners in udp-tracker-server package
d81e59e fix: [#1565] ban service should work with stats disabled (Jose Celano) Pull request description: Fix bug. The ban service should work even when statistics are disabled. There was only one event listener in the UDP tracker server package to update stats and update ban service data (increase banning counters). However, the listener was disabled when stats were not needed. The ban service should be always be enabled (or at least enable/disable it independently from stats). The problem has been fixed by creating an independent event listener for the banning service that is always enabled. In the future, we could add a config option to enable/disable it. ACKs for top commit: josecelano: ACK d81e59e Tree-SHA512: 46bbbc8fa97be929b9541fa96c953f2d6b9212edb52c95971b6dae0dba4abf3a6b52a931d5570484a80b2a33bf347a97823d15dec9e7a6429c84f4f74eda07fb
2 parents d949e48 + d81e59e commit 04de1df

File tree

16 files changed

+137
-105
lines changed

16 files changed

+137
-105
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::sync::Arc;
2+
3+
use bittorrent_udp_tracker_core::services::banning::BanService;
4+
use tokio::sync::RwLock;
5+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
6+
7+
use crate::event::{ErrorKind, Event};
8+
9+
pub async fn handle_event(event: Event, ban_service: &Arc<RwLock<BanService>>, _now: DurationSinceUnixEpoch) {
10+
if let Event::UdpError {
11+
context,
12+
kind: _,
13+
error: ErrorKind::ConnectionCookie(_msg),
14+
} = event
15+
{
16+
let mut ban_service = ban_service.write().await;
17+
ban_service.increase_counter(&context.client_socket_addr().ip());
18+
}
19+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use std::sync::Arc;
2+
3+
use bittorrent_udp_tracker_core::services::banning::BanService;
4+
use bittorrent_udp_tracker_core::UDP_TRACKER_LOG_TARGET;
5+
use tokio::sync::RwLock;
6+
use tokio::task::JoinHandle;
7+
use torrust_tracker_clock::clock::Time;
8+
use torrust_tracker_events::receiver::RecvError;
9+
10+
use super::handler::handle_event;
11+
use crate::event::receiver::Receiver;
12+
use crate::CurrentClock;
13+
14+
#[must_use]
15+
pub fn run_event_listener(receiver: Receiver, ban_service: &Arc<RwLock<BanService>>) -> JoinHandle<()> {
16+
let ban_service_clone = ban_service.clone();
17+
18+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Starting UDP tracker server event listener (banning)");
19+
20+
tokio::spawn(async move {
21+
dispatch_events(receiver, ban_service_clone).await;
22+
23+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener (banning) finished");
24+
})
25+
}
26+
27+
async fn dispatch_events(mut receiver: Receiver, ban_service: Arc<RwLock<BanService>>) {
28+
let shutdown_signal = tokio::signal::ctrl_c();
29+
tokio::pin!(shutdown_signal);
30+
31+
loop {
32+
tokio::select! {
33+
biased;
34+
35+
_ = &mut shutdown_signal => {
36+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Received Ctrl+C, shutting down UDP tracker server event listener (banning)");
37+
break;
38+
}
39+
40+
result = receiver.recv() => {
41+
match result {
42+
Ok(event) => handle_event(event, &ban_service, CurrentClock::now()).await,
43+
Err(e) => {
44+
match e {
45+
RecvError::Closed => {
46+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp server receiver (banning) closed.");
47+
break;
48+
}
49+
RecvError::Lagged(n) => {
50+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp server receiver (banning) lagged by {} events.", n);
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}
57+
}
58+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod handler;
2+
pub mod listener;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod event;

packages/udp-tracker-server/src/environment.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
use std::net::SocketAddr;
22
use std::sync::Arc;
33

4-
use bittorrent_primitives::info_hash::InfoHash;
54
use bittorrent_tracker_core::container::TrackerCoreContainer;
65
use bittorrent_udp_tracker_core::container::UdpTrackerCoreContainer;
76
use tokio::task::JoinHandle;
87
use torrust_server_lib::registar::Registar;
98
use torrust_tracker_configuration::{logging, Configuration, DEFAULT_TIMEOUT};
10-
use torrust_tracker_primitives::peer;
119
use torrust_tracker_swarm_coordination_registry::container::SwarmCoordinationRegistryContainer;
1210

1311
use crate::container::UdpTrackerServerContainer;
@@ -25,22 +23,8 @@ where
2523
pub registar: Registar,
2624
pub server: Server<S>,
2725
pub udp_core_event_listener_job: Option<JoinHandle<()>>,
28-
pub udp_server_event_listener_job: Option<JoinHandle<()>>,
29-
}
30-
31-
impl<S> Environment<S>
32-
where
33-
S: std::fmt::Debug + std::fmt::Display,
34-
{
35-
/// Add a torrent to the tracker
36-
#[allow(dead_code)]
37-
pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) {
38-
self.container
39-
.tracker_core_container
40-
.in_memory_torrent_repository
41-
.handle_announcement(info_hash, peer, None)
42-
.await;
43-
}
26+
pub udp_server_stats_event_listener_job: Option<JoinHandle<()>>,
27+
pub udp_server_banning_event_listener_job: Option<JoinHandle<()>>,
4428
}
4529

4630
impl Environment<Stopped> {
@@ -60,7 +44,8 @@ impl Environment<Stopped> {
6044
registar: Registar::default(),
6145
server,
6246
udp_core_event_listener_job: None,
63-
udp_server_event_listener_job: None,
47+
udp_server_stats_event_listener_job: None,
48+
udp_server_banning_event_listener_job: None,
6449
}
6550
}
6651

@@ -78,10 +63,15 @@ impl Environment<Stopped> {
7863
&self.container.udp_tracker_core_container.stats_repository,
7964
));
8065

81-
// Start the UDP tracker server event listener
82-
let udp_server_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
66+
// Start the UDP tracker server event listener (statistics)
67+
let udp_server_stats_event_listener_job = Some(crate::statistics::event::listener::run_event_listener(
8368
self.container.udp_tracker_server_container.event_bus.receiver(),
8469
&self.container.udp_tracker_server_container.stats_repository,
70+
));
71+
72+
// Start the UDP tracker server event listener (banning)
73+
let udp_server_banning_event_listener_job = Some(crate::banning::event::listener::run_event_listener(
74+
self.container.udp_tracker_server_container.event_bus.receiver(),
8575
&self.container.udp_tracker_core_container.ban_service,
8676
));
8777

@@ -102,7 +92,8 @@ impl Environment<Stopped> {
10292
registar: self.registar.clone(),
10393
server,
10494
udp_core_event_listener_job,
105-
udp_server_event_listener_job,
95+
udp_server_stats_event_listener_job,
96+
udp_server_banning_event_listener_job,
10697
}
10798
}
10899
}
@@ -131,11 +122,18 @@ impl Environment<Running> {
131122
udp_core_event_listener_job.abort();
132123
}
133124

134-
// Stop the UDP tracker server event listener
135-
if let Some(udp_server_event_listener_job) = self.udp_server_event_listener_job {
125+
// Stop the UDP tracker server event listener (statistics)
126+
if let Some(udp_server_stats_event_listener_job) = self.udp_server_stats_event_listener_job {
127+
// todo: send a message to the event listener to stop and wait for
128+
// it to finish
129+
udp_server_stats_event_listener_job.abort();
130+
}
131+
132+
// Stop the UDP tracker server event listener (banning)
133+
if let Some(udp_server_banning_event_listener_job) = self.udp_server_banning_event_listener_job {
136134
// todo: send a message to the event listener to stop and wait for
137135
// it to finish
138-
udp_server_event_listener_job.abort();
136+
udp_server_banning_event_listener_job.abort();
139137
}
140138

141139
// Stop the UDP tracker server
@@ -149,7 +147,8 @@ impl Environment<Running> {
149147
registar: Registar::default(),
150148
server,
151149
udp_core_event_listener_job: None,
152-
udp_server_event_listener_job: None,
150+
udp_server_stats_event_listener_job: None,
151+
udp_server_banning_event_listener_job: None,
153152
}
154153
}
155154

packages/udp-tracker-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@
634634
//! documentation by [Arvid Norberg](https://github.com/arvidn) was very
635635
//! supportive in the development of this documentation. Some descriptions were
636636
//! taken from the [libtorrent](https://www.rasterbar.com/products/libtorrent/udp_tracker_protocol.html).
637+
pub mod banning;
637638
pub mod container;
638639
pub mod environment;
639640
pub mod error;

packages/udp-tracker-server/src/statistics/event/handler/error.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use std::sync::Arc;
2-
31
use aquatic_udp_protocol::PeerClient;
4-
use bittorrent_udp_tracker_core::services::banning::BanService;
5-
use tokio::sync::RwLock;
62
use torrust_tracker_metrics::label::LabelSet;
73
use torrust_tracker_metrics::{label_name, metric_name};
84
use torrust_tracker_primitives::DurationSinceUnixEpoch;
@@ -16,16 +12,9 @@ pub async fn handle_event(
1612
opt_udp_request_kind: Option<UdpRequestKind>,
1713
error_kind: ErrorKind,
1814
repository: &Repository,
19-
ban_service: &Arc<RwLock<BanService>>,
2015
now: DurationSinceUnixEpoch,
2116
) {
22-
if let ErrorKind::ConnectionCookie(_msg) = error_kind.clone() {
23-
let mut ban_service = ban_service.write().await;
24-
ban_service.increase_counter(&connection_context.client_socket_addr().ip());
25-
}
26-
2717
update_global_fixed_metrics(&connection_context, repository).await;
28-
2918
update_extendable_metrics(&connection_context, opt_udp_request_kind, error_kind, repository, now).await;
3019
}
3120

@@ -126,9 +115,7 @@ fn extract_name_and_version(peer_client: &PeerClient) -> (String, String) {
126115
#[cfg(test)]
127116
mod tests {
128117
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
129-
use std::sync::Arc;
130118

131-
use bittorrent_udp_tracker_core::services::banning::BanService;
132119
use torrust_tracker_clock::clock::Time;
133120
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};
134121

@@ -141,7 +128,6 @@ mod tests {
141128
#[tokio::test]
142129
async fn should_increase_the_udp4_errors_counter_when_it_receives_a_udp4_error_event() {
143130
let stats_repository = Repository::new();
144-
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
145131

146132
handle_event(
147133
Event::UdpError {
@@ -157,7 +143,6 @@ mod tests {
157143
error: ErrorKind::RequestParse("Invalid request format".to_string()),
158144
},
159145
&stats_repository,
160-
&ban_service,
161146
CurrentClock::now(),
162147
)
163148
.await;

packages/udp-tracker-server/src/statistics/event/handler/mod.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,12 @@ mod request_banned;
55
mod request_received;
66
mod response_sent;
77

8-
use std::sync::Arc;
9-
10-
use bittorrent_udp_tracker_core::services::banning::BanService;
11-
use tokio::sync::RwLock;
128
use torrust_tracker_primitives::DurationSinceUnixEpoch;
139

1410
use crate::event::Event;
1511
use crate::statistics::repository::Repository;
1612

17-
pub async fn handle_event(
18-
event: Event,
19-
stats_repository: &Repository,
20-
ban_service: &Arc<RwLock<BanService>>,
21-
now: DurationSinceUnixEpoch,
22-
) {
13+
pub async fn handle_event(event: Event, stats_repository: &Repository, now: DurationSinceUnixEpoch) {
2314
match event {
2415
Event::UdpRequestAborted { context } => {
2516
request_aborted::handle_event(context, stats_repository, now).await;
@@ -41,7 +32,7 @@ pub async fn handle_event(
4132
response_sent::handle_event(context, kind, req_processing_time, stats_repository, now).await;
4233
}
4334
Event::UdpError { context, kind, error } => {
44-
error::handle_event(context, kind, error, stats_repository, ban_service, now).await;
35+
error::handle_event(context, kind, error, stats_repository, now).await;
4536
}
4637
}
4738

packages/udp-tracker-server/src/statistics/event/handler/request_aborted.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ pub async fn handle_event(context: ConnectionContext, stats_repository: &Reposit
2727
#[cfg(test)]
2828
mod tests {
2929
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
30-
use std::sync::Arc;
3130

32-
use bittorrent_udp_tracker_core::services::banning::BanService;
3331
use torrust_tracker_clock::clock::Time;
3432
use torrust_tracker_primitives::service_binding::{Protocol, ServiceBinding};
3533

@@ -41,7 +39,6 @@ mod tests {
4139
#[tokio::test]
4240
async fn should_increase_the_number_of_aborted_requests_when_it_receives_a_udp_request_aborted_event() {
4341
let stats_repository = Repository::new();
44-
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
4542

4643
handle_event(
4744
Event::UdpRequestAborted {
@@ -55,7 +52,6 @@ mod tests {
5552
),
5653
},
5754
&stats_repository,
58-
&ban_service,
5955
CurrentClock::now(),
6056
)
6157
.await;
@@ -68,7 +64,6 @@ mod tests {
6864
#[tokio::test]
6965
async fn should_increase_the_udp_abort_counter_when_it_receives_a_udp_abort_event() {
7066
let stats_repository = Repository::new();
71-
let ban_service = Arc::new(tokio::sync::RwLock::new(BanService::new(1)));
7267

7368
handle_event(
7469
Event::UdpRequestAborted {
@@ -82,7 +77,6 @@ mod tests {
8277
),
8378
},
8479
&stats_repository,
85-
&ban_service,
8680
CurrentClock::now(),
8781
)
8882
.await;

0 commit comments

Comments
 (0)