Skip to content

Commit 38dc5c1

Browse files
A temp to address the data loss due to a fundamental mismatch between synchronous signals and asynchronous effects
How It Works: The new shutdown sequence ensures messages are not lost by: 1. Cancelling TelemetryListener and awaiting HTTP server shutdown 2. Dropping TelemetryListener to close logs_tx channel 3. Cancelling LogsAgent and awaiting its completion (ensuring all logs are drained) 4. Sending tombstone event only after all messages are processed 5. Dropping event_bus_tx and waiting for tombstone to be processed
1 parent ea48140 commit 38dc5c1

File tree

3 files changed

+382
-80
lines changed

3 files changed

+382
-80
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 120 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,19 @@ async fn extension_loop_active(
511511
.to_string();
512512
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
513513

514-
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
515-
start_logs_agent(
516-
config,
517-
Arc::clone(&api_key_factory),
518-
&tags_provider,
519-
event_bus_tx.clone(),
520-
aws_config.is_managed_instance_mode(),
521-
);
514+
let (
515+
logs_agent_channel,
516+
logs_flusher,
517+
logs_agent_cancel_token,
518+
logs_aggregator_handle,
519+
logs_agent_handle,
520+
) = start_logs_agent(
521+
config,
522+
Arc::clone(&api_key_factory),
523+
&tags_provider,
524+
event_bus_tx.clone(),
525+
aws_config.is_managed_instance_mode(),
526+
);
522527

523528
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
524529
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
@@ -584,16 +589,16 @@ async fn extension_loop_active(
584589
}
585590
});
586591

587-
let telemetry_listener_cancel_token = setup_telemetry_client(
588-
client,
589-
&r.extension_id,
590-
&aws_config.runtime_api,
591-
logs_agent_channel,
592-
event_bus_tx.clone(),
593-
config.serverless_logs_enabled,
594-
aws_config.is_managed_instance_mode(),
595-
)
596-
.await?;
592+
let (telemetry_listener_cancel_token, telemetry_listener, telemetry_listener_handle) =
593+
setup_telemetry_client(
594+
client,
595+
&r.extension_id,
596+
&aws_config.runtime_api,
597+
logs_agent_channel,
598+
config.serverless_logs_enabled,
599+
aws_config.is_managed_instance_mode(),
600+
)
601+
.await?;
597602

598603
let otlp_cancel_token = start_otlp_agent(
599604
config,
@@ -780,8 +785,43 @@ async fn extension_loop_active(
780785
// Shutdown sequence
781786
debug!("Initiating shutdown sequence");
782787

783-
// Wait for tombstone event from telemetry listener to ensure all events are processed
784-
// This is the result of code refactoring which is shared by OnDemand mode as well.
788+
// Step 1: Cancel TelemetryListener HTTP server
789+
debug!("SHUTDOWN | Step 1: Cancelling TelemetryListener");
790+
telemetry_listener_cancel_token.cancel();
791+
792+
// Step 2: Await TelemetryListener HTTP server to fully shut down
793+
// This ensures all in-flight HTTP requests are completed and logs_tx channel receives all messages
794+
debug!("SHUTDOWN | Step 2: Awaiting TelemetryListener HTTP server shutdown");
795+
if let Err(e) = telemetry_listener_handle.await {
796+
error!("Error waiting for telemetry listener to shut down: {e:?}");
797+
}
798+
799+
// Step 3: Drop TelemetryListener to close logs_tx channel
800+
debug!("SHUTDOWN | Step 3: Dropping TelemetryListener to close logs_tx channel");
801+
drop(telemetry_listener);
802+
803+
// Step 4: Cancel LogsAgent
804+
debug!("SHUTDOWN | Step 4: Cancelling LogsAgent");
805+
logs_agent_cancel_token.cancel();
806+
807+
// Step 5: Await LogsAgent to finish draining all logs from logs_tx channel
808+
debug!("SHUTDOWN | Step 5: Awaiting LogsAgent to drain all logs");
809+
if let Err(e) = logs_agent_handle.await {
810+
error!("Error waiting for logs agent to shut down: {e:?}");
811+
}
812+
813+
// Step 6: Send tombstone event to signal no more telemetry events
814+
debug!("SHUTDOWN | Step 6: Sending tombstone event");
815+
if let Err(e) = event_bus_tx.send(Event::Tombstone).await {
816+
error!("Failed to send tombstone event: {e:?}");
817+
}
818+
819+
// Step 7: Drop event_bus_tx to allow the event_bus channel to close
820+
debug!("SHUTDOWN | Step 7: Dropping event_bus_tx");
821+
drop(event_bus_tx);
822+
823+
// Step 8: Wait for tombstone event to be processed from event_bus
824+
debug!("SHUTDOWN | Step 8: Waiting for tombstone event to be processed");
785825
wait_for_tombstone_event(
786826
&mut event_bus,
787827
&invocation_processor_handle,
@@ -794,13 +834,14 @@ async fn extension_loop_active(
794834
)
795835
.await;
796836

797-
// Cancel background tasks
837+
// Step 9: Cancel other background services
838+
debug!("SHUTDOWN | Step 9: Cancelling other background services");
798839
cancel_background_services(
799840
api_runtime_proxy_shutdown_signal.as_ref(),
800841
otlp_cancel_token.as_ref(),
801842
&trace_agent_shutdown_token,
802843
&dogstatsd_cancel_token,
803-
&telemetry_listener_cancel_token,
844+
&telemetry_listener_cancel_token, // Already cancelled, but included for completeness
804845
&lifecycle_listener_shutdown_token,
805846
);
806847

@@ -1023,18 +1064,9 @@ async fn extension_loop_active(
10231064
}
10241065

10251066
if let NextEventResponse::Shutdown { .. } = maybe_shutdown_event {
1026-
// Cancel Telemetry API listener
1027-
// Important to do this first, so we can receive the Tombstone event which signals
1028-
// that there are no more Telemetry events to process
1029-
telemetry_listener_cancel_token.cancel();
1030-
1031-
// Cancel Logs Agent which might have Telemetry API events to process
1032-
logs_agent_cancel_token.cancel();
1067+
debug!("OnDemand mode: Initiating shutdown sequence");
10331068

1034-
// Drop the event bus sender to allow the channel to close properly
1035-
drop(event_bus_tx);
1036-
1037-
// Redrive/block on any failed payloads
1069+
// Redrive/block on any failed payloads before starting shutdown
10381070
let tf = trace_flusher.clone();
10391071
pending_flush_handles
10401072
.await_flush_handles(
@@ -1044,7 +1076,44 @@ async fn extension_loop_active(
10441076
&proxy_flusher,
10451077
)
10461078
.await;
1047-
// Wait for tombstone event from telemetry listener to ensure all events are processed
1079+
1080+
// Step 1: Cancel TelemetryListener HTTP server
1081+
debug!("SHUTDOWN | Step 1: Cancelling TelemetryListener");
1082+
telemetry_listener_cancel_token.cancel();
1083+
1084+
// Step 2: Await TelemetryListener HTTP server to fully shut down
1085+
// This ensures all in-flight HTTP requests are completed and logs_tx channel receives all messages
1086+
debug!("SHUTDOWN | Step 2: Awaiting TelemetryListener HTTP server shutdown");
1087+
if let Err(e) = telemetry_listener_handle.await {
1088+
error!("Error waiting for telemetry listener to shut down: {e:?}");
1089+
}
1090+
1091+
// Step 3: Drop TelemetryListener to close logs_tx channel
1092+
debug!("SHUTDOWN | Step 3: Dropping TelemetryListener to close logs_tx channel");
1093+
drop(telemetry_listener);
1094+
1095+
// Step 4: Cancel LogsAgent
1096+
debug!("SHUTDOWN | Step 4: Cancelling LogsAgent");
1097+
logs_agent_cancel_token.cancel();
1098+
1099+
// Step 5: Await LogsAgent to finish draining all logs from logs_tx channel
1100+
debug!("SHUTDOWN | Step 5: Awaiting LogsAgent to drain all logs");
1101+
if let Err(e) = logs_agent_handle.await {
1102+
error!("Error waiting for logs agent to shut down: {e:?}");
1103+
}
1104+
1105+
// Step 6: Send tombstone event to signal no more telemetry events
1106+
debug!("SHUTDOWN | Step 6: Sending tombstone event");
1107+
if let Err(e) = event_bus_tx.send(Event::Tombstone).await {
1108+
error!("Failed to send tombstone event: {e:?}");
1109+
}
1110+
1111+
// Step 7: Drop event_bus_tx to allow the event_bus channel to close
1112+
debug!("SHUTDOWN | Step 7: Dropping event_bus_tx");
1113+
drop(event_bus_tx);
1114+
1115+
// Step 8: Wait for tombstone event to be processed from event_bus
1116+
debug!("SHUTDOWN | Step 8: Waiting for tombstone event to be processed");
10481117
wait_for_tombstone_event(
10491118
&mut event_bus,
10501119
&invocation_processor_handle,
@@ -1057,13 +1126,14 @@ async fn extension_loop_active(
10571126
)
10581127
.await;
10591128

1060-
// Cancel background services
1129+
// Step 9: Cancel other background services
1130+
debug!("SHUTDOWN | Step 9: Cancelling other background services");
10611131
cancel_background_services(
10621132
api_runtime_proxy_shutdown_signal.as_ref(),
10631133
otlp_cancel_token.as_ref(),
10641134
&trace_agent_shutdown_token,
10651135
&dogstatsd_cancel_token,
1066-
&telemetry_listener_cancel_token,
1136+
&telemetry_listener_cancel_token, // Already cancelled, but included for completeness
10671137
&lifecycle_listener_shutdown_token,
10681138
);
10691139

@@ -1410,6 +1480,7 @@ fn start_logs_agent(
14101480
LogsFlusher,
14111481
CancellationToken,
14121482
LogsAggregatorHandle,
1483+
JoinHandle<()>,
14131484
) {
14141485
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
14151486
// Start service in background
@@ -1425,16 +1496,22 @@ fn start_logs_agent(
14251496
is_managed_instance_mode,
14261497
);
14271498
let cancel_token = agent.cancel_token();
1428-
// Start logs agent in background
1429-
tokio::spawn(async move {
1499+
// Start logs agent in background and return JoinHandle for awaitable completion
1500+
let logs_agent_handle = tokio::spawn(async move {
14301501
agent.spin().await;
14311502

14321503
debug!("LOGS_AGENT | Shutting down...");
14331504
drop(agent);
14341505
});
14351506

14361507
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
1437-
(tx, flusher, cancel_token, aggregator_handle)
1508+
(
1509+
tx,
1510+
flusher,
1511+
cancel_token,
1512+
aggregator_handle,
1513+
logs_agent_handle,
1514+
)
14381515
}
14391516

14401517
#[allow(clippy::type_complexity)]
@@ -1665,22 +1742,13 @@ async fn setup_telemetry_client(
16651742
extension_id: &str,
16661743
runtime_api: &str,
16671744
logs_tx: Sender<TelemetryEvent>,
1668-
event_bus_tx: Sender<Event>,
16691745
logs_enabled: bool,
16701746
managed_instance_mode: bool,
1671-
) -> anyhow::Result<CancellationToken> {
1672-
let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx, event_bus_tx);
1747+
) -> anyhow::Result<(CancellationToken, TelemetryListener, JoinHandle<()>)> {
1748+
let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx);
16731749

16741750
let cancel_token = listener.cancel_token();
1675-
match listener.start() {
1676-
Ok(()) => {
1677-
// Drop the listener, so event_bus_tx is closed
1678-
drop(listener);
1679-
}
1680-
Err(e) => {
1681-
error!("Error starting telemetry listener: {e:?}");
1682-
}
1683-
}
1751+
let telemetry_handle = listener.start();
16841752

16851753
telemetry::subscribe(
16861754
client,
@@ -1693,7 +1761,7 @@ async fn setup_telemetry_client(
16931761
.await
16941762
.map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?;
16951763

1696-
Ok(cancel_token)
1764+
Ok((cancel_token, listener, telemetry_handle))
16971765
}
16981766

16991767
fn start_otlp_agent(

0 commit comments

Comments
 (0)