Skip to content

Commit 072c2d7

Browse files
A temp to address the data loss due to a fundamental mismatch between synchronous signals and asynchronous effects
How It Works: The new shutdown sequence ensures messages are not lost by: 1. Cancelling TelemetryListener and awaiting HTTP server shutdown 2. Dropping TelemetryListener to close logs_tx channel 3. Cancelling LogsAgent and awaiting its completion (ensuring all logs are drained) 4. Sending tombstone event only after all messages are processed 5. Dropping event_bus_tx and waiting for tombstone to be processed
1 parent ea48140 commit 072c2d7

File tree

2 files changed

+136
-80
lines changed

2 files changed

+136
-80
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 120 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,19 @@ async fn extension_loop_active(
511511
.to_string();
512512
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
513513

514-
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
515-
start_logs_agent(
516-
config,
517-
Arc::clone(&api_key_factory),
518-
&tags_provider,
519-
event_bus_tx.clone(),
520-
aws_config.is_managed_instance_mode(),
521-
);
514+
let (
515+
logs_agent_channel,
516+
logs_flusher,
517+
logs_agent_cancel_token,
518+
logs_aggregator_handle,
519+
logs_agent_handle,
520+
) = start_logs_agent(
521+
config,
522+
Arc::clone(&api_key_factory),
523+
&tags_provider,
524+
event_bus_tx.clone(),
525+
aws_config.is_managed_instance_mode(),
526+
);
522527

523528
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
524529
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
@@ -584,16 +589,16 @@ async fn extension_loop_active(
584589
}
585590
});
586591

587-
let telemetry_listener_cancel_token = setup_telemetry_client(
588-
client,
589-
&r.extension_id,
590-
&aws_config.runtime_api,
591-
logs_agent_channel,
592-
event_bus_tx.clone(),
593-
config.serverless_logs_enabled,
594-
aws_config.is_managed_instance_mode(),
595-
)
596-
.await?;
592+
let (telemetry_listener_cancel_token, telemetry_listener, telemetry_listener_handle) =
593+
setup_telemetry_client(
594+
client,
595+
&r.extension_id,
596+
&aws_config.runtime_api,
597+
logs_agent_channel,
598+
config.serverless_logs_enabled,
599+
aws_config.is_managed_instance_mode(),
600+
)
601+
.await?;
597602

598603
let otlp_cancel_token = start_otlp_agent(
599604
config,
@@ -780,8 +785,43 @@ async fn extension_loop_active(
780785
// Shutdown sequence
781786
debug!("Initiating shutdown sequence");
782787

783-
// Wait for tombstone event from telemetry listener to ensure all events are processed
784-
// This is the result of code refactoring which is shared by OnDemand mode as well.
788+
// Step 1: Cancel TelemetryListener HTTP server
789+
debug!("SHUTDOWN | Step 1: Cancelling TelemetryListener");
790+
telemetry_listener_cancel_token.cancel();
791+
792+
// Step 2: Await TelemetryListener HTTP server to fully shut down
793+
// This ensures all in-flight HTTP requests are completed and logs_tx channel receives all messages
794+
debug!("SHUTDOWN | Step 2: Awaiting TelemetryListener HTTP server shutdown");
795+
if let Err(e) = telemetry_listener_handle.await {
796+
error!("Error waiting for telemetry listener to shut down: {e:?}");
797+
}
798+
799+
// Step 3: Drop TelemetryListener to close logs_tx channel
800+
debug!("SHUTDOWN | Step 3: Dropping TelemetryListener to close logs_tx channel");
801+
drop(telemetry_listener);
802+
803+
// Step 4: Cancel LogsAgent
804+
debug!("SHUTDOWN | Step 4: Cancelling LogsAgent");
805+
logs_agent_cancel_token.cancel();
806+
807+
// Step 5: Await LogsAgent to finish draining all logs from logs_tx channel
808+
debug!("SHUTDOWN | Step 5: Awaiting LogsAgent to drain all logs");
809+
if let Err(e) = logs_agent_handle.await {
810+
error!("Error waiting for logs agent to shut down: {e:?}");
811+
}
812+
813+
// Step 6: Send tombstone event to signal no more telemetry events
814+
debug!("SHUTDOWN | Step 6: Sending tombstone event");
815+
if let Err(e) = event_bus_tx.send(Event::Tombstone).await {
816+
error!("Failed to send tombstone event: {e:?}");
817+
}
818+
819+
// Step 7: Drop event_bus_tx to allow the event_bus channel to close
820+
debug!("SHUTDOWN | Step 7: Dropping event_bus_tx");
821+
drop(event_bus_tx);
822+
823+
// Step 8: Wait for tombstone event to be processed from event_bus
824+
debug!("SHUTDOWN | Step 8: Waiting for tombstone event to be processed");
785825
wait_for_tombstone_event(
786826
&mut event_bus,
787827
&invocation_processor_handle,
@@ -794,13 +834,14 @@ async fn extension_loop_active(
794834
)
795835
.await;
796836

797-
// Cancel background tasks
837+
// Step 9: Cancel other background services
838+
debug!("SHUTDOWN | Step 9: Cancelling other background services");
798839
cancel_background_services(
799840
api_runtime_proxy_shutdown_signal.as_ref(),
800841
otlp_cancel_token.as_ref(),
801842
&trace_agent_shutdown_token,
802843
&dogstatsd_cancel_token,
803-
&telemetry_listener_cancel_token,
844+
&telemetry_listener_cancel_token, // Already cancelled, but included for completeness
804845
&lifecycle_listener_shutdown_token,
805846
);
806847

@@ -1023,18 +1064,9 @@ async fn extension_loop_active(
10231064
}
10241065

10251066
if let NextEventResponse::Shutdown { .. } = maybe_shutdown_event {
1026-
// Cancel Telemetry API listener
1027-
// Important to do this first, so we can receive the Tombstone event which signals
1028-
// that there are no more Telemetry events to process
1029-
telemetry_listener_cancel_token.cancel();
1030-
1031-
// Cancel Logs Agent which might have Telemetry API events to process
1032-
logs_agent_cancel_token.cancel();
1067+
debug!("OnDemand mode: Initiating shutdown sequence");
10331068

1034-
// Drop the event bus sender to allow the channel to close properly
1035-
drop(event_bus_tx);
1036-
1037-
// Redrive/block on any failed payloads
1069+
// Redrive/block on any failed payloads before starting shutdown
10381070
let tf = trace_flusher.clone();
10391071
pending_flush_handles
10401072
.await_flush_handles(
@@ -1044,7 +1076,44 @@ async fn extension_loop_active(
10441076
&proxy_flusher,
10451077
)
10461078
.await;
1047-
// Wait for tombstone event from telemetry listener to ensure all events are processed
1079+
1080+
// Step 1: Cancel TelemetryListener HTTP server
1081+
debug!("SHUTDOWN | Step 1: Cancelling TelemetryListener");
1082+
telemetry_listener_cancel_token.cancel();
1083+
1084+
// Step 2: Await TelemetryListener HTTP server to fully shut down
1085+
// This ensures all in-flight HTTP requests are completed and logs_tx channel receives all messages
1086+
debug!("SHUTDOWN | Step 2: Awaiting TelemetryListener HTTP server shutdown");
1087+
if let Err(e) = telemetry_listener_handle.await {
1088+
error!("Error waiting for telemetry listener to shut down: {e:?}");
1089+
}
1090+
1091+
// Step 3: Drop TelemetryListener to close logs_tx channel
1092+
debug!("SHUTDOWN | Step 3: Dropping TelemetryListener to close logs_tx channel");
1093+
drop(telemetry_listener);
1094+
1095+
// Step 4: Cancel LogsAgent
1096+
debug!("SHUTDOWN | Step 4: Cancelling LogsAgent");
1097+
logs_agent_cancel_token.cancel();
1098+
1099+
// Step 5: Await LogsAgent to finish draining all logs from logs_tx channel
1100+
debug!("SHUTDOWN | Step 5: Awaiting LogsAgent to drain all logs");
1101+
if let Err(e) = logs_agent_handle.await {
1102+
error!("Error waiting for logs agent to shut down: {e:?}");
1103+
}
1104+
1105+
// Step 6: Send tombstone event to signal no more telemetry events
1106+
debug!("SHUTDOWN | Step 6: Sending tombstone event");
1107+
if let Err(e) = event_bus_tx.send(Event::Tombstone).await {
1108+
error!("Failed to send tombstone event: {e:?}");
1109+
}
1110+
1111+
// Step 7: Drop event_bus_tx to allow the event_bus channel to close
1112+
debug!("SHUTDOWN | Step 7: Dropping event_bus_tx");
1113+
drop(event_bus_tx);
1114+
1115+
// Step 8: Wait for tombstone event to be processed from event_bus
1116+
debug!("SHUTDOWN | Step 8: Waiting for tombstone event to be processed");
10481117
wait_for_tombstone_event(
10491118
&mut event_bus,
10501119
&invocation_processor_handle,
@@ -1057,13 +1126,14 @@ async fn extension_loop_active(
10571126
)
10581127
.await;
10591128

1060-
// Cancel background services
1129+
// Step 9: Cancel other background services
1130+
debug!("SHUTDOWN | Step 9: Cancelling other background services");
10611131
cancel_background_services(
10621132
api_runtime_proxy_shutdown_signal.as_ref(),
10631133
otlp_cancel_token.as_ref(),
10641134
&trace_agent_shutdown_token,
10651135
&dogstatsd_cancel_token,
1066-
&telemetry_listener_cancel_token,
1136+
&telemetry_listener_cancel_token, // Already cancelled, but included for completeness
10671137
&lifecycle_listener_shutdown_token,
10681138
);
10691139

@@ -1410,6 +1480,7 @@ fn start_logs_agent(
14101480
LogsFlusher,
14111481
CancellationToken,
14121482
LogsAggregatorHandle,
1483+
JoinHandle<()>,
14131484
) {
14141485
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
14151486
// Start service in background
@@ -1425,16 +1496,22 @@ fn start_logs_agent(
14251496
is_managed_instance_mode,
14261497
);
14271498
let cancel_token = agent.cancel_token();
1428-
// Start logs agent in background
1429-
tokio::spawn(async move {
1499+
// Start logs agent in background and return JoinHandle for awaitable completion
1500+
let logs_agent_handle = tokio::spawn(async move {
14301501
agent.spin().await;
14311502

14321503
debug!("LOGS_AGENT | Shutting down...");
14331504
drop(agent);
14341505
});
14351506

14361507
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
1437-
(tx, flusher, cancel_token, aggregator_handle)
1508+
(
1509+
tx,
1510+
flusher,
1511+
cancel_token,
1512+
aggregator_handle,
1513+
logs_agent_handle,
1514+
)
14381515
}
14391516

14401517
#[allow(clippy::type_complexity)]
@@ -1665,22 +1742,13 @@ async fn setup_telemetry_client(
16651742
extension_id: &str,
16661743
runtime_api: &str,
16671744
logs_tx: Sender<TelemetryEvent>,
1668-
event_bus_tx: Sender<Event>,
16691745
logs_enabled: bool,
16701746
managed_instance_mode: bool,
1671-
) -> anyhow::Result<CancellationToken> {
1672-
let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx, event_bus_tx);
1747+
) -> anyhow::Result<(CancellationToken, TelemetryListener, JoinHandle<()>)> {
1748+
let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx);
16731749

16741750
let cancel_token = listener.cancel_token();
1675-
match listener.start() {
1676-
Ok(()) => {
1677-
// Drop the listener, so event_bus_tx is closed
1678-
drop(listener);
1679-
}
1680-
Err(e) => {
1681-
error!("Error starting telemetry listener: {e:?}");
1682-
}
1683-
}
1751+
let telemetry_handle = listener.start();
16841752

16851753
telemetry::subscribe(
16861754
client,
@@ -1693,7 +1761,7 @@ async fn setup_telemetry_client(
16931761
.await
16941762
.map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?;
16951763

1696-
Ok(cancel_token)
1764+
Ok((cancel_token, listener, telemetry_handle))
16971765
}
16981766

16991767
fn start_otlp_agent(

bottlecap/src/extension/telemetry/listener.rs

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::{
2-
event_bus,
32
extension::telemetry::events::TelemetryEvent,
43
http::{extract_request_body, handler_not_found},
54
};
@@ -12,7 +11,7 @@ use axum::{
1211
routing::post,
1312
};
1413
use std::net::SocketAddr;
15-
use tokio::{net::TcpListener, sync::mpsc::Sender};
14+
use tokio::{net::TcpListener, sync::mpsc::Sender, task::JoinHandle};
1615
use tokio_util::sync::CancellationToken;
1716
use tracing::debug;
1817

@@ -23,24 +22,17 @@ pub struct TelemetryListener {
2322
port: u16,
2423
cancel_token: CancellationToken,
2524
logs_tx: Sender<TelemetryEvent>,
26-
event_bus_tx: Sender<event_bus::Event>,
2725
}
2826

2927
impl TelemetryListener {
3028
#[must_use]
31-
pub fn new(
32-
host: [u8; 4],
33-
port: u16,
34-
logs_tx: Sender<TelemetryEvent>,
35-
event_bus_tx: Sender<event_bus::Event>,
36-
) -> Self {
29+
pub fn new(host: [u8; 4], port: u16, logs_tx: Sender<TelemetryEvent>) -> Self {
3730
let cancel_token = CancellationToken::new();
3831
Self {
3932
host,
4033
port,
4134
cancel_token,
4235
logs_tx,
43-
event_bus_tx,
4436
}
4537
}
4638

@@ -49,24 +41,26 @@ impl TelemetryListener {
4941
self.cancel_token.clone()
5042
}
5143

52-
pub fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
44+
/// Starts the telemetry listener HTTP server.
45+
/// Returns a `JoinHandle` that completes when the server has fully shut down,
46+
/// ensuring all in-flight HTTP requests have been processed.
47+
#[must_use]
48+
pub fn start(&self) -> JoinHandle<()> {
5349
let socket = SocketAddr::from((self.host, self.port));
5450
let router = self.make_router();
5551

5652
let cancel_token_clone = self.cancel_token();
57-
let event_bus_tx = self.event_bus_tx.clone();
5853
tokio::spawn(async move {
5954
let listener = TcpListener::bind(&socket)
6055
.await
6156
.expect("Failed to bind socket");
6257
debug!("TELEMETRY API | Starting listener on {}", socket);
6358
axum::serve(listener, router)
64-
.with_graceful_shutdown(Self::graceful_shutdown(cancel_token_clone, event_bus_tx))
59+
.with_graceful_shutdown(Self::graceful_shutdown(cancel_token_clone))
6560
.await
6661
.expect("Failed to start telemetry listener");
67-
});
68-
69-
Ok(())
62+
debug!("TELEMETRY API | HTTP server fully shut down");
63+
})
7064
}
7165

7266
fn make_router(&self) -> Router {
@@ -78,19 +72,13 @@ impl TelemetryListener {
7872
.with_state(logs_tx)
7973
}
8074

81-
async fn graceful_shutdown(
82-
cancel_token: CancellationToken,
83-
event_bus_tx: Sender<event_bus::Event>,
84-
) {
75+
async fn graceful_shutdown(cancel_token: CancellationToken) {
8576
cancel_token.cancelled().await;
86-
debug!("TELEMETRY API | Shutdown signal received, sending tombstone event");
87-
88-
// Send tombstone event to signal shutdown
89-
if let Err(e) = event_bus_tx.send(event_bus::Event::Tombstone).await {
90-
debug!("TELEMETRY API |Failed to send tombstone event: {:?}", e);
91-
}
92-
93-
debug!("TELEMETRY API | Shutting down");
77+
debug!(
78+
"TELEMETRY API | Shutdown signal received, initiating graceful HTTP server shutdown"
79+
);
80+
// Note: Tombstone event is now sent by the main shutdown sequence
81+
// after all telemetry messages have been processed
9482
}
9583

9684
async fn handle(State(logs_tx): State<Sender<TelemetryEvent>>, request: Request) -> Response {

0 commit comments

Comments
 (0)