Skip to content

Commit 93f851d

Browse files
committed
Merge #1530: Overhaul stats: add peer inactivity metrics to torrent-repository package
677deac feat: [#1523] add new metric: number of inactive torrents (Jose Celano) 260f7ff feat: [#1523] add new metric: number of inactive peers (Jose Celano) Pull request description: This adds two new metrics: - [x] `torrent_repository_inactive_peers_total`: The number of inactive peers across all torrents. - [x] `torrent_repository_inactive_torrents_total`: The number of inactive torrents (inactive swarms). ACKs for top commit: josecelano: ACK 677deac Tree-SHA512: d58b8c756dfdd064b5a1552f05bf89357e8528d639e4eb41d5a2543e5cae8a16d4cb380ae1df388546b7f9a50bc52d76d5e7f55963a009a4e40e18d42f460dda
2 parents fbefc88 + 677deac commit 93f851d

File tree

11 files changed

+336
-5
lines changed

11 files changed

+336
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/torrent-repository/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ version.workspace = true
1818
[dependencies]
1919
aquatic_udp_protocol = "0"
2020
bittorrent-primitives = "0.1.0"
21+
chrono = { version = "0", default-features = false, features = ["clock"] }
2122
crossbeam-skiplist = "0"
2223
futures = "0"
2324
serde = { version = "1.0.219", features = ["derive"] }
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//! Job that runs a task on intervals to update peers' activity metrics.
2+
use std::sync::Arc;
3+
4+
use chrono::Utc;
5+
use tokio::task::JoinHandle;
6+
use torrust_tracker_clock::clock::Time;
7+
use torrust_tracker_metrics::label::LabelSet;
8+
use torrust_tracker_metrics::metric_name;
9+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
10+
use tracing::instrument;
11+
12+
use super::repository::Repository;
13+
use crate::statistics::{TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL, TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL};
14+
use crate::{CurrentClock, Swarms};
15+
16+
#[must_use]
17+
#[instrument(skip(swarms, stats_repository))]
18+
pub fn start_job(
19+
swarms: &Arc<Swarms>,
20+
stats_repository: &Arc<Repository>,
21+
inactivity_cutoff: DurationSinceUnixEpoch,
22+
) -> JoinHandle<()> {
23+
let weak_swarms = std::sync::Arc::downgrade(swarms);
24+
let weak_stats_repository = std::sync::Arc::downgrade(stats_repository);
25+
26+
let interval_in_secs = 15; // todo: make this configurable
27+
28+
tokio::spawn(async move {
29+
let interval = std::time::Duration::from_secs(interval_in_secs);
30+
let mut interval = tokio::time::interval(interval);
31+
interval.tick().await;
32+
33+
loop {
34+
tokio::select! {
35+
_ = tokio::signal::ctrl_c() => {
36+
tracing::info!("Stopping peers activity metrics update job (ctrl-c signal received) ...");
37+
break;
38+
}
39+
_ = interval.tick() => {
40+
if let (Some(swarms), Some(stats_repository)) = (weak_swarms.upgrade(), weak_stats_repository.upgrade()) {
41+
update_activity_metrics(interval_in_secs, &swarms, &stats_repository, inactivity_cutoff).await;
42+
} else {
43+
tracing::info!("Stopping peers activity metrics update job (can't upgrade weak pointers) ...");
44+
break;
45+
}
46+
}
47+
}
48+
}
49+
})
50+
}
51+
52+
async fn update_activity_metrics(
53+
interval_in_secs: u64,
54+
swarms: &Arc<Swarms>,
55+
stats_repository: &Arc<Repository>,
56+
inactivity_cutoff: DurationSinceUnixEpoch,
57+
) {
58+
let start_time = Utc::now().time();
59+
60+
tracing::debug!(
61+
"Updating peers and torrents activity metrics (executed every {} secs) ...",
62+
interval_in_secs
63+
);
64+
65+
let activity_metadata = swarms.get_activity_metadata(inactivity_cutoff).await;
66+
67+
activity_metadata.log();
68+
69+
update_inactive_peers_total(stats_repository, activity_metadata.inactive_peers_total).await;
70+
update_inactive_torrents_total(stats_repository, activity_metadata.inactive_torrents_total).await;
71+
72+
tracing::debug!(
73+
"Peers and torrents activity metrics updated in {} ms",
74+
(Utc::now().time() - start_time).num_milliseconds()
75+
);
76+
}
77+
78+
async fn update_inactive_peers_total(stats_repository: &Arc<Repository>, inactive_peers_total: usize) {
79+
#[allow(clippy::cast_precision_loss)]
80+
let inactive_peers_total = inactive_peers_total as f64;
81+
82+
let _unused = stats_repository
83+
.set_gauge(
84+
&metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL),
85+
&LabelSet::default(),
86+
inactive_peers_total,
87+
CurrentClock::now(),
88+
)
89+
.await;
90+
}
91+
92+
async fn update_inactive_torrents_total(stats_repository: &Arc<Repository>, inactive_torrents_total: usize) {
93+
#[allow(clippy::cast_precision_loss)]
94+
let inactive_torrents_total = inactive_torrents_total as f64;
95+
96+
let _unused = stats_repository
97+
.set_gauge(
98+
&metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL),
99+
&LabelSet::default(),
100+
inactive_torrents_total,
101+
CurrentClock::now(),
102+
)
103+
.await;
104+
}

packages/torrent-repository/src/statistics/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod activity_metrics_updater;
12
pub mod event;
23
pub mod metrics;
34
pub mod repository;
@@ -14,6 +15,7 @@ const TORRENT_REPOSITORY_TORRENTS_REMOVED_TOTAL: &str = "torrent_repository_torr
1415

1516
const TORRENT_REPOSITORY_TORRENTS_TOTAL: &str = "torrent_repository_torrents_total";
1617
const TORRENT_REPOSITORY_TORRENTS_DOWNLOADS_TOTAL: &str = "torrent_repository_torrents_downloads_total";
18+
const TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL: &str = "torrent_repository_torrents_inactive_total";
1719

1820
// Peers metrics
1921

@@ -23,6 +25,7 @@ const TORRENT_REPOSITORY_PEERS_UPDATED_TOTAL: &str = "torrent_repository_peers_u
2325

2426
const TORRENT_REPOSITORY_PEER_CONNECTIONS_TOTAL: &str = "torrent_repository_peer_connections_total";
2527
const TORRENT_REPOSITORY_UNIQUE_PEERS_TOTAL: &str = "torrent_repository_unique_peers_total"; // todo: not implemented yet
28+
const TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL: &str = "torrent_repository_peers_inactive_total";
2629

2730
#[must_use]
2831
pub fn describe_metrics() -> Metrics {
@@ -54,6 +57,12 @@ pub fn describe_metrics() -> Metrics {
5457
Some(&MetricDescription::new("The total number of torrent downloads.")),
5558
);
5659

60+
metrics.metric_collection.describe_gauge(
61+
&metric_name!(TORRENT_REPOSITORY_TORRENTS_INACTIVE_TOTAL),
62+
Some(Unit::Count),
63+
Some(&MetricDescription::new("The total number of inactive torrents.")),
64+
);
65+
5766
// Peers metrics
5867

5968
metrics.metric_collection.describe_counter(
@@ -88,5 +97,11 @@ pub fn describe_metrics() -> Metrics {
8897
Some(&MetricDescription::new("The total number of unique peers.")),
8998
);
9099

100+
metrics.metric_collection.describe_gauge(
101+
&metric_name!(TORRENT_REPOSITORY_PEERS_INACTIVE_TOTAL),
102+
Some(Unit::Count),
103+
Some(&MetricDescription::new("The total number of inactive peers.")),
104+
);
105+
91106
metrics
92107
}

packages/torrent-repository/src/statistics/repository.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,31 @@ impl Repository {
5757
result
5858
}
5959

60+
/// # Errors
61+
///
62+
/// This function will return an error if the metric collection fails to
63+
/// set the gauge.
64+
pub async fn set_gauge(
65+
&self,
66+
metric_name: &MetricName,
67+
labels: &LabelSet,
68+
value: f64,
69+
now: DurationSinceUnixEpoch,
70+
) -> Result<(), Error> {
71+
let mut stats_lock = self.stats.write().await;
72+
73+
let result = stats_lock.set_gauge(metric_name, labels, value, now);
74+
75+
drop(stats_lock);
76+
77+
match result {
78+
Ok(()) => {}
79+
Err(ref err) => tracing::error!("Failed to set the gauge: {}", err),
80+
}
81+
82+
result
83+
}
84+
6085
/// # Errors
6186
///
6287
/// This function will return an error if the metric collection fails to

packages/torrent-repository/src/swarm.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,25 @@ impl Swarm {
118118
(seeders, leechers)
119119
}
120120

121+
#[must_use]
122+
pub fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize {
123+
self.peers
124+
.iter()
125+
.filter(|(_, peer)| peer::ReadInfo::get_updated(&**peer) <= current_cutoff)
126+
.count()
127+
}
128+
129+
#[must_use]
130+
pub fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> ActivityMetadata {
131+
let inactive_peers_total = self.count_inactive_peers(current_cutoff);
132+
133+
let active_peers_total = self.len() - inactive_peers_total;
134+
135+
let is_active = active_peers_total > 0;
136+
137+
ActivityMetadata::new(is_active, active_peers_total, inactive_peers_total)
138+
}
139+
121140
#[must_use]
122141
pub fn len(&self) -> usize {
123142
self.peers.len()
@@ -288,6 +307,30 @@ impl Swarm {
288307
}
289308
}
290309

310+
#[derive(Clone)]
311+
pub struct ActivityMetadata {
312+
/// Indicates if the swarm is active. It's inactive if there are no active
313+
/// peers.
314+
pub is_active: bool,
315+
316+
/// The number of active peers in the swarm.
317+
pub active_peers_total: usize,
318+
319+
/// The number of inactive peers in the swarm.
320+
pub inactive_peers_total: usize,
321+
}
322+
323+
impl ActivityMetadata {
324+
#[must_use]
325+
pub fn new(is_active: bool, active_peers_total: usize, inactive_peers_total: usize) -> Self {
326+
Self {
327+
is_active,
328+
active_peers_total,
329+
inactive_peers_total,
330+
}
331+
}
332+
}
333+
291334
#[cfg(test)]
292335
mod tests {
293336

@@ -435,6 +478,22 @@ mod tests {
435478
assert_eq!(swarm.peers_excluding(&peer2.peer_addr, None), [Arc::new(peer1)]);
436479
}
437480

481+
#[tokio::test]
482+
async fn it_should_count_inactive_peers() {
483+
let mut swarm = Swarm::new(&sample_info_hash(), 0, None);
484+
let mut downloads_increased = false;
485+
let one_second = DurationSinceUnixEpoch::new(1, 0);
486+
487+
// Insert the peer
488+
let last_update_time = DurationSinceUnixEpoch::new(1_669_397_478_934, 0);
489+
let peer = PeerBuilder::default().last_updated_on(last_update_time).build();
490+
swarm.upsert_peer(peer.into(), &mut downloads_increased).await;
491+
492+
let inactive_peers_total = swarm.count_inactive_peers(last_update_time + one_second);
493+
494+
assert_eq!(inactive_peers_total, 1);
495+
}
496+
438497
#[tokio::test]
439498
async fn it_should_remove_inactive_peers() {
440499
let mut swarm = Swarm::new(&sample_info_hash(), 0, None);

packages/torrent-repository/src/swarms.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,44 @@ impl Swarms {
248248
}
249249
}
250250

251+
pub async fn get_activity_metadata(&self, current_cutoff: DurationSinceUnixEpoch) -> AggregateActivityMetadata {
252+
let mut active_peers_total = 0;
253+
let mut inactive_peers_total = 0;
254+
let mut active_torrents_total = 0;
255+
256+
for swarm_handle in &self.swarms {
257+
let swarm = swarm_handle.value().lock().await;
258+
259+
let activity_metadata = swarm.get_activity_metadata(current_cutoff);
260+
261+
if activity_metadata.is_active {
262+
active_torrents_total += 1;
263+
}
264+
265+
active_peers_total += activity_metadata.active_peers_total;
266+
inactive_peers_total += activity_metadata.inactive_peers_total;
267+
}
268+
269+
AggregateActivityMetadata {
270+
active_peers_total,
271+
inactive_peers_total,
272+
active_torrents_total,
273+
inactive_torrents_total: self.len() - active_torrents_total,
274+
}
275+
}
276+
277+
/// Counts the number of inactive peers across all torrents.
278+
pub async fn count_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> usize {
279+
let mut inactive_peers_total = 0;
280+
281+
for swarm_handle in &self.swarms {
282+
let swarm = swarm_handle.value().lock().await;
283+
inactive_peers_total += swarm.count_inactive_peers(current_cutoff);
284+
}
285+
286+
inactive_peers_total
287+
}
288+
251289
/// Removes inactive peers from all torrent entries.
252290
///
253291
/// A peer is considered inactive if its last update timestamp is older than
@@ -434,6 +472,31 @@ impl Swarms {
434472
#[derive(thiserror::Error, Debug, Clone)]
435473
pub enum Error {}
436474

475+
#[derive(Clone, Debug, Default)]
476+
pub struct AggregateActivityMetadata {
477+
/// The number of active peers in all swarms.
478+
pub active_peers_total: usize,
479+
480+
/// The number of inactive peers in all swarms.
481+
pub inactive_peers_total: usize,
482+
483+
/// The number of active torrents.
484+
pub active_torrents_total: usize,
485+
486+
/// The number of inactive torrents.
487+
pub inactive_torrents_total: usize,
488+
}
489+
490+
impl AggregateActivityMetadata {
491+
pub fn log(&self) {
492+
tracing::info!(
493+
active_peers_total = self.active_peers_total,
494+
inactive_peers_total = self.inactive_peers_total,
495+
active_torrents_total = self.active_torrents_total,
496+
inactive_torrents_total = self.inactive_torrents_total
497+
);
498+
}
499+
}
437500
#[cfg(test)]
438501
mod tests {
439502

@@ -705,6 +768,22 @@ mod tests {
705768
assert!(swarms.get(&info_hash).is_none());
706769
}
707770

771+
#[tokio::test]
772+
async fn it_should_count_inactive_peers() {
773+
let swarms = Arc::new(Swarms::default());
774+
775+
let info_hash = sample_info_hash();
776+
let mut peer = sample_peer();
777+
peer.updated = DurationSinceUnixEpoch::new(0, 0);
778+
779+
swarms.handle_announcement(&info_hash, &peer, None).await.unwrap();
780+
781+
// Cut off time is 1 second after the peer was updated
782+
let inactive_peers_total = swarms.count_inactive_peers(peer.updated.add(Duration::from_secs(1))).await;
783+
784+
assert_eq!(inactive_peers_total, 1);
785+
}
786+
708787
#[tokio::test]
709788
async fn it_should_remove_peers_that_have_not_been_updated_after_a_cutoff_time() {
710789
let swarms = Arc::new(Swarms::default());

0 commit comments

Comments
 (0)