Skip to content

Commit fb47517

Browse files
committed
metrics
1 parent 14ed419 commit fb47517

File tree

1 file changed

+51
-4
lines changed

1 file changed

+51
-4
lines changed

iroh/examples/transfer.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,11 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr, metrics_output: Op
408408
let start = Instant::now();
409409
let remote_id = remote_addr.id;
410410

411+
// Capture initial metrics
412+
if metrics_output.is_some() {
413+
emit_endpoint_metrics(&endpoint);
414+
}
415+
411416
// Attempt to connect, over the given ALPN.
412417
// Returns a Quinn connection.
413418
println!("EVENT:{{\"type\":\"ConnectionAttempt\",\"remote\":\"{}\",\"timestamp\":{}}}",
@@ -433,7 +438,7 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr, metrics_output: Op
433438
println!("EVENT:{{\"type\":\"TransferStart\",\"remote\":\"{}\",\"timestamp\":{}}}",
434439
remote_id, OffsetDateTime::now_utc().unix_timestamp());
435440

436-
let (len, time_to_first_byte, chnk) = drain_stream(&mut recv, false, endpoint.id(), metrics_output).await?;
441+
let (len, time_to_first_byte, chnk) = drain_stream(&mut recv, false, &endpoint, metrics_output).await?;
437442

438443
// We received the last message: close all connections and allow for the close
439444
// message to be sent.
@@ -477,21 +482,28 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr, metrics_output: Op
477482
info!("Metrics written to {:?}", path);
478483
}
479484

485+
// Emit final endpoint metrics snapshot
486+
if metrics_output.is_some() {
487+
emit_endpoint_metrics(&endpoint);
488+
}
489+
480490
Ok(())
481491
}
482492

483493
async fn drain_stream(
484494
stream: &mut iroh::endpoint::RecvStream,
485495
read_unordered: bool,
486-
endpoint_id: EndpointId,
496+
endpoint: &Endpoint,
487497
metrics_output: Option<&std::path::PathBuf>,
488498
) -> Result<(usize, Duration, u64)> {
499+
let endpoint_id = endpoint.id();
489500
let mut read = 0;
490501

491502
let download_start = Instant::now();
492503
let mut first_byte = true;
493504
let mut time_to_first_byte = download_start.elapsed();
494505
let mut last_metric_time = download_start;
506+
let mut last_bytes = 0;
495507

496508
let mut num_chunks: u64 = 0;
497509

@@ -506,13 +518,19 @@ async fn drain_stream(
506518

507519
// Emit periodic metrics every second
508520
if metrics_output.is_some() && last_metric_time.elapsed() >= Duration::from_secs(1) {
521+
let bytes_since_last = read - last_bytes;
522+
let instant_throughput = bytes_since_last as f64 / last_metric_time.elapsed().as_secs_f64();
523+
509524
emit_progress_metrics(
510525
endpoint_id,
511526
read,
512527
download_start.elapsed(),
513528
num_chunks,
529+
instant_throughput as u64,
514530
);
531+
emit_endpoint_metrics(endpoint);
515532
last_metric_time = Instant::now();
533+
last_bytes = read;
516534
}
517535
}
518536
} else {
@@ -539,29 +557,57 @@ async fn drain_stream(
539557

540558
// Emit periodic metrics every second
541559
if metrics_output.is_some() && last_metric_time.elapsed() >= Duration::from_secs(1) {
560+
let bytes_since_last = read - last_bytes;
561+
let instant_throughput = bytes_since_last as f64 / last_metric_time.elapsed().as_secs_f64();
562+
542563
emit_progress_metrics(
543564
endpoint_id,
544565
read,
545566
download_start.elapsed(),
546567
num_chunks,
568+
instant_throughput as u64,
547569
);
570+
emit_endpoint_metrics(endpoint);
548571
last_metric_time = Instant::now();
572+
last_bytes = read;
549573
}
550574
}
551575
}
552576

553577
Ok((read, time_to_first_byte, num_chunks))
554578
}
555579

580+
fn emit_endpoint_metrics(endpoint: &Endpoint) {
581+
let timestamp = OffsetDateTime::now_utc();
582+
let metrics = endpoint.metrics();
583+
584+
// Serialize entire EndpointMetrics struct
585+
let metrics_serialized = match serde_json::to_value(metrics) {
586+
Ok(mut val) => {
587+
// Add timestamp and node_id to the metrics object
588+
if let Some(obj) = val.as_object_mut() {
589+
obj.insert("timestamp".to_string(), json!(timestamp.unix_timestamp()));
590+
obj.insert("timestamp_rfc3339".to_string(), json!(timestamp.to_string()));
591+
obj.insert("node_id".to_string(), json!(endpoint.id().to_string()));
592+
}
593+
val
594+
}
595+
Err(_) => return,
596+
};
597+
598+
println!("ENDPOINT_METRICS:{}", serde_json::to_string(&metrics_serialized).unwrap_or_default());
599+
}
600+
556601
fn emit_progress_metrics(
557602
endpoint_id: EndpointId,
558603
bytes_transferred: usize,
559604
elapsed: Duration,
560605
chunks: u64,
606+
instant_throughput: u64,
561607
) {
562608
let timestamp = OffsetDateTime::now_utc();
563609

564-
let throughput = if elapsed.as_secs_f64() > 0.0 {
610+
let avg_throughput = if elapsed.as_secs_f64() > 0.0 {
565611
bytes_transferred as f64 / elapsed.as_secs_f64()
566612
} else {
567613
0.0
@@ -573,7 +619,8 @@ fn emit_progress_metrics(
573619
"node_id": endpoint_id.to_string(),
574620
"bytes_transferred": bytes_transferred,
575621
"elapsed_ms": elapsed.as_millis() as u64,
576-
"current_throughput": throughput as u64,
622+
"avg_throughput": avg_throughput as u64,
623+
"instant_throughput": instant_throughput,
577624
"chunks": chunks,
578625
});
579626

0 commit comments

Comments
 (0)