Skip to content

Commit a7cbdcb

Browse files
committed
Merge #1557: Add a new labeled metric to track requests with wrong connection ID per client's software.
7e616d7 feat: [#1556] add a new metric to count connection ID errors per clietn software (Jose Celano) d9f4c13 refactor: [#1556] extract functions (Jose Celano) Pull request description: Add a new labeled metric to track requests with wrong connection ID per client's software. ACKs for top commit: josecelano: ACK 7e616d7 Tree-SHA512: ed8c2be5845a179692b18b0cc84cbaccf1b7f8178f1754835ee472041eab8c35d366fa9e8193ae4020aba35c1272f0551003febf80e37c8d15597d72f4ca8ca4
2 parents 578057b + 7e616d7 commit a7cbdcb

File tree

9 files changed

+175
-60
lines changed

9 files changed

+175
-60
lines changed

cSpell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
"proto",
128128
"Quickstart",
129129
"Radeon",
130+
"Rakshasa",
130131
"Rasterbar",
131132
"realpath",
132133
"reannounce",

packages/udp-tracker-server/src/event.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::fmt;
22
use std::net::SocketAddr;
33
use std::time::Duration;
44

5+
use aquatic_udp_protocol::AnnounceRequest;
56
use bittorrent_tracker_core::error::{AnnounceError, ScrapeError};
67
use bittorrent_udp_tracker_core::services::announce::UdpAnnounceError;
78
use bittorrent_udp_tracker_core::services::scrape::UdpScrapeError;
@@ -42,15 +43,25 @@ pub enum Event {
4243
#[derive(Debug, PartialEq, Eq, Clone)]
4344
pub enum UdpRequestKind {
4445
Connect,
45-
Announce,
46+
Announce { announce_request: AnnounceRequest },
4647
Scrape,
4748
}
4849

50+
impl From<UdpRequestKind> for LabelValue {
51+
fn from(kind: UdpRequestKind) -> Self {
52+
match kind {
53+
UdpRequestKind::Connect => LabelValue::new("connect"),
54+
UdpRequestKind::Announce { .. } => LabelValue::new("announce"),
55+
UdpRequestKind::Scrape => LabelValue::new("scrape"),
56+
}
57+
}
58+
}
59+
4960
impl fmt::Display for UdpRequestKind {
5061
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
5162
let proto_str = match self {
5263
UdpRequestKind::Connect => "connect",
53-
UdpRequestKind::Announce => "announce",
64+
UdpRequestKind::Announce { .. } => "announce",
5465
UdpRequestKind::Scrape => "scrape",
5566
};
5667
write!(f, "{proto_str}")

packages/udp-tracker-server/src/handlers/announce.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,25 @@ pub async fn handle_announce(
4444
udp_server_stats_event_sender
4545
.send(Event::UdpRequestAccepted {
4646
context: ConnectionContext::new(client_socket_addr, server_service_binding.clone()),
47-
kind: UdpRequestKind::Announce,
47+
kind: UdpRequestKind::Announce {
48+
announce_request: *request,
49+
},
4850
})
4951
.await;
5052
}
5153

5254
let announce_data = announce_service
5355
.handle_announce(client_socket_addr, server_service_binding, request, cookie_valid_range)
5456
.await
55-
.map_err(|e| (e.into(), request.transaction_id, UdpRequestKind::Announce))?;
57+
.map_err(|e| {
58+
(
59+
e.into(),
60+
request.transaction_id,
61+
UdpRequestKind::Announce {
62+
announce_request: *request,
63+
},
64+
)
65+
})?;
5666

5767
Ok(build_response(client_socket_addr, request, core_config, &announce_data))
5868
}
@@ -118,9 +128,9 @@ fn build_response(
118128
}
119129

120130
#[cfg(test)]
121-
mod tests {
131+
pub(crate) mod tests {
122132

123-
mod announce_request {
133+
pub mod announce_request {
124134

125135
use std::net::Ipv4Addr;
126136
use std::num::NonZeroU16;
@@ -133,7 +143,7 @@ mod tests {
133143

134144
use crate::handlers::tests::{sample_ipv4_remote_addr_fingerprint, sample_issue_time};
135145

136-
struct AnnounceRequestBuilder {
146+
pub struct AnnounceRequestBuilder {
137147
request: AnnounceRequest,
138148
}
139149

@@ -431,13 +441,14 @@ mod tests {
431441
let client_socket_addr = sample_ipv4_socket_address();
432442
let server_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 196)), 6969);
433443
let server_service_binding = ServiceBinding::new(Protocol::UDP, server_socket_addr).unwrap();
444+
let announce_request = AnnounceRequestBuilder::default().into();
434445

435446
let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
436447
udp_server_stats_event_sender_mock
437448
.expect_send()
438449
.with(eq(Event::UdpRequestAccepted {
439450
context: ConnectionContext::new(client_socket_addr, server_service_binding.clone()),
440-
kind: UdpRequestKind::Announce,
451+
kind: UdpRequestKind::Announce { announce_request },
441452
}))
442453
.times(1)
443454
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
@@ -451,7 +462,7 @@ mod tests {
451462
&core_udp_tracker_services.announce_service,
452463
client_socket_addr,
453464
server_service_binding,
454-
&AnnounceRequestBuilder::default().into(),
465+
&announce_request,
455466
&core_tracker_services.core_config,
456467
&udp_server_stats_event_sender,
457468
sample_cookie_valid_range(),
@@ -795,12 +806,16 @@ mod tests {
795806
let server_socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 203, 0, 113, 196)), 6969);
796807
let server_service_binding = ServiceBinding::new(Protocol::UDP, server_socket_addr).unwrap();
797808

809+
let announce_request = AnnounceRequestBuilder::default()
810+
.with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap())
811+
.into();
812+
798813
let mut udp_server_stats_event_sender_mock = MockUdpServerStatsEventSender::new();
799814
udp_server_stats_event_sender_mock
800815
.expect_send()
801816
.with(eq(Event::UdpRequestAccepted {
802817
context: ConnectionContext::new(client_socket_addr, server_service_binding.clone()),
803-
kind: UdpRequestKind::Announce,
818+
kind: UdpRequestKind::Announce { announce_request },
804819
}))
805820
.times(1)
806821
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
@@ -810,10 +825,6 @@ mod tests {
810825
let (core_tracker_services, core_udp_tracker_services, _server_udp_tracker_services) =
811826
initialize_core_tracker_services_for_default_tracker_configuration();
812827

813-
let announce_request = AnnounceRequestBuilder::default()
814-
.with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap())
815-
.into();
816-
817828
handle_announce(
818829
&core_udp_tracker_services.announce_service,
819830
client_socket_addr,
@@ -887,6 +898,14 @@ mod tests {
887898
let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default());
888899
let db_downloads_metric_repository = Arc::new(DatabaseDownloadsMetricRepository::new(&database));
889900

901+
let request = AnnounceRequestBuilder::default()
902+
.with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap())
903+
.with_info_hash(info_hash)
904+
.with_peer_id(peer_id)
905+
.with_ip_address(client_ip_v4)
906+
.with_port(client_port)
907+
.into();
908+
890909
let mut udp_core_stats_event_sender_mock = MockUdpCoreStatsEventSender::new();
891910
udp_core_stats_event_sender_mock
892911
.expect_send()
@@ -912,7 +931,9 @@ mod tests {
912931
.expect_send()
913932
.with(eq(Event::UdpRequestAccepted {
914933
context: ConnectionContext::new(client_socket_addr, server_service_binding_clone.clone()),
915-
kind: UdpRequestKind::Announce,
934+
kind: UdpRequestKind::Announce {
935+
announce_request: request,
936+
},
916937
}))
917938
.times(1)
918939
.returning(|_| Box::pin(future::ready(Some(Ok(1)))));
@@ -926,14 +947,6 @@ mod tests {
926947
&db_downloads_metric_repository,
927948
));
928949

929-
let request = AnnounceRequestBuilder::default()
930-
.with_connection_id(make(gen_remote_fingerprint(&client_socket_addr), sample_issue_time()).unwrap())
931-
.with_info_hash(info_hash)
932-
.with_peer_id(peer_id)
933-
.with_ip_address(client_ip_v4)
934-
.with_port(client_port)
935-
.into();
936-
937950
let core_config = Arc::new(config.core.clone());
938951

939952
let announce_service = Arc::new(AnnounceService::new(

packages/udp-tracker-server/src/handlers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pub async fn handle_request(
177177
)
178178
.await
179179
{
180-
Ok(response) => Ok((response, UdpRequestKind::Announce)),
180+
Ok(response) => Ok((response, UdpRequestKind::Announce { announce_request })),
181181
Err(err) => Err(err),
182182
}
183183
}

packages/udp-tracker-server/src/server/processor.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,15 @@ impl Processor {
8787
};
8888

8989
let udp_response_kind = match &response {
90-
Response::Connect(_) => event::UdpResponseKind::Ok {
91-
req_kind: event::UdpRequestKind::Connect,
92-
},
93-
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => event::UdpResponseKind::Ok {
94-
req_kind: event::UdpRequestKind::Announce,
95-
},
96-
Response::Scrape(_) => event::UdpResponseKind::Ok {
97-
req_kind: event::UdpRequestKind::Scrape,
98-
},
9990
Response::Error(_e) => event::UdpResponseKind::Error { opt_req_kind: None },
91+
_ => {
92+
if let Some(req_kind) = opt_req_kind {
93+
event::UdpResponseKind::Ok { req_kind }
94+
} else {
95+
// code-review: this case should never happen.
96+
event::UdpResponseKind::Error { opt_req_kind }
97+
}
98+
}
10099
};
101100

102101
let mut writer = Cursor::new(Vec::with_capacity(200));

packages/udp-tracker-server/src/statistics/event/handler/error.rs

Lines changed: 92 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use aquatic_udp_protocol::PeerClient;
34
use bittorrent_udp_tracker_core::services::banning::BanService;
45
use tokio::sync::RwLock;
56
use torrust_tracker_metrics::label::LabelSet;
@@ -8,45 +9,118 @@ use torrust_tracker_primitives::DurationSinceUnixEpoch;
89

910
use crate::event::{ConnectionContext, ErrorKind, UdpRequestKind};
1011
use crate::statistics::repository::Repository;
11-
use crate::statistics::UDP_TRACKER_SERVER_ERRORS_TOTAL;
12+
use crate::statistics::{UDP_TRACKER_SERVER_CONNECTION_ID_ERRORS_TOTAL, UDP_TRACKER_SERVER_ERRORS_TOTAL};
1213

1314
pub async fn handle_event(
14-
context: ConnectionContext,
15-
kind: Option<UdpRequestKind>,
16-
error: ErrorKind,
17-
stats_repository: &Repository,
15+
connection_context: ConnectionContext,
16+
opt_udp_request_kind: Option<UdpRequestKind>,
17+
error_kind: ErrorKind,
18+
repository: &Repository,
1819
ban_service: &Arc<RwLock<BanService>>,
1920
now: DurationSinceUnixEpoch,
2021
) {
21-
// Increase the number of errors
22-
// code-review: should we ban IP due to other errors too?
23-
if let ErrorKind::ConnectionCookie(_msg) = error {
22+
if let ErrorKind::ConnectionCookie(_msg) = error_kind.clone() {
2423
let mut ban_service = ban_service.write().await;
25-
ban_service.increase_counter(&context.client_socket_addr().ip());
24+
ban_service.increase_counter(&connection_context.client_socket_addr().ip());
2625
}
2726

28-
// Global fixed metrics
29-
match context.client_socket_addr().ip() {
27+
update_global_fixed_metrics(&connection_context, repository).await;
28+
29+
update_extendable_metrics(&connection_context, opt_udp_request_kind, error_kind, repository, now).await;
30+
}
31+
32+
async fn update_global_fixed_metrics(connection_context: &ConnectionContext, repository: &Repository) {
33+
match connection_context.client_socket_addr().ip() {
3034
std::net::IpAddr::V4(_) => {
31-
stats_repository.increase_udp4_errors().await;
35+
repository.increase_udp4_errors().await;
3236
}
3337
std::net::IpAddr::V6(_) => {
34-
stats_repository.increase_udp6_errors().await;
38+
repository.increase_udp6_errors().await;
3539
}
3640
}
41+
}
3742

38-
// Extendable metrics
39-
let mut label_set = LabelSet::from(context);
40-
if let Some(kind) = kind {
43+
async fn update_extendable_metrics(
44+
connection_context: &ConnectionContext,
45+
opt_udp_request_kind: Option<UdpRequestKind>,
46+
error_kind: ErrorKind,
47+
repository: &Repository,
48+
now: DurationSinceUnixEpoch,
49+
) {
50+
update_all_errors_counter(connection_context, opt_udp_request_kind.clone(), repository, now).await;
51+
update_connection_id_errors_counter(opt_udp_request_kind, error_kind, repository, now).await;
52+
}
53+
54+
async fn update_all_errors_counter(
55+
connection_context: &ConnectionContext,
56+
opt_udp_request_kind: Option<UdpRequestKind>,
57+
repository: &Repository,
58+
now: DurationSinceUnixEpoch,
59+
) {
60+
let mut label_set = LabelSet::from(connection_context.clone());
61+
62+
if let Some(kind) = opt_udp_request_kind.clone() {
4163
label_set.upsert(label_name!("request_kind"), kind.to_string().into());
4264
}
43-
match stats_repository
65+
66+
match repository
4467
.increase_counter(&metric_name!(UDP_TRACKER_SERVER_ERRORS_TOTAL), &label_set, now)
4568
.await
4669
{
4770
Ok(()) => {}
4871
Err(err) => tracing::error!("Failed to increase the counter: {}", err),
49-
};
72+
}
73+
}
74+
75+
async fn update_connection_id_errors_counter(
76+
opt_udp_request_kind: Option<UdpRequestKind>,
77+
error_kind: ErrorKind,
78+
repository: &Repository,
79+
now: DurationSinceUnixEpoch,
80+
) {
81+
if let ErrorKind::ConnectionCookie(_) = error_kind {
82+
if let Some(UdpRequestKind::Announce { announce_request }) = opt_udp_request_kind {
83+
let (client_software_name, client_software_version) = extract_name_and_version(&announce_request.peer_id.client());
84+
85+
let label_set = LabelSet::from([
86+
(label_name!("client_software_name"), client_software_name.into()),
87+
(label_name!("client_software_version"), client_software_version.into()),
88+
]);
89+
90+
match repository
91+
.increase_counter(&metric_name!(UDP_TRACKER_SERVER_CONNECTION_ID_ERRORS_TOTAL), &label_set, now)
92+
.await
93+
{
94+
Ok(()) => {}
95+
Err(err) => tracing::error!("Failed to increase the counter: {}", err),
96+
};
97+
}
98+
}
99+
}
100+
101+
fn extract_name_and_version(peer_client: &PeerClient) -> (String, String) {
102+
match peer_client {
103+
PeerClient::BitTorrent(compact_string) => ("BitTorrent".to_string(), compact_string.as_str().to_owned()),
104+
PeerClient::Deluge(compact_string) => ("Deluge".to_string(), compact_string.as_str().to_owned()),
105+
PeerClient::LibTorrentRakshasa(compact_string) => ("lt (rakshasa)".to_string(), compact_string.as_str().to_owned()),
106+
PeerClient::LibTorrentRasterbar(compact_string) => ("lt (rasterbar)".to_string(), compact_string.as_str().to_owned()),
107+
PeerClient::QBitTorrent(compact_string) => ("QBitTorrent".to_string(), compact_string.as_str().to_owned()),
108+
PeerClient::Transmission(compact_string) => ("Transmission".to_string(), compact_string.as_str().to_owned()),
109+
PeerClient::UTorrent(compact_string) => ("µTorrent".to_string(), compact_string.as_str().to_owned()),
110+
PeerClient::UTorrentEmbedded(compact_string) => ("µTorrent Emb.".to_string(), compact_string.as_str().to_owned()),
111+
PeerClient::UTorrentMac(compact_string) => ("µTorrent Mac".to_string(), compact_string.as_str().to_owned()),
112+
PeerClient::UTorrentWeb(compact_string) => ("µTorrent Web".to_string(), compact_string.as_str().to_owned()),
113+
PeerClient::Vuze(compact_string) => ("Vuze".to_string(), compact_string.as_str().to_owned()),
114+
PeerClient::WebTorrent(compact_string) => ("WebTorrent".to_string(), compact_string.as_str().to_owned()),
115+
PeerClient::WebTorrentDesktop(compact_string) => ("WebTorrent Desktop".to_string(), compact_string.as_str().to_owned()),
116+
PeerClient::Mainline(compact_string) => ("Mainline".to_string(), compact_string.as_str().to_owned()),
117+
PeerClient::OtherWithPrefixAndVersion { prefix, version } => {
118+
(format!("Other ({})", prefix.as_str()), version.as_str().to_owned())
119+
}
120+
PeerClient::OtherWithPrefix(compact_string) => (format!("Other ({compact_string})"), String::new()),
121+
PeerClient::Other => ("Other".to_string(), String::new()),
122+
_ => ("Unknown".to_string(), String::new()),
123+
}
50124
}
51125

52126
#[cfg(test)]

0 commit comments

Comments
 (0)