Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 120 additions & 52 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,19 @@ async fn extension_loop_active(
.to_string();
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);

let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
aws_config.is_managed_instance_mode(),
);
let (
logs_agent_channel,
logs_flusher,
logs_agent_cancel_token,
logs_aggregator_handle,
logs_agent_handle,
) = start_logs_agent(
config,
Arc::clone(&api_key_factory),
&tags_provider,
event_bus_tx.clone(),
aws_config.is_managed_instance_mode(),
);

let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
Expand Down Expand Up @@ -584,16 +589,16 @@ async fn extension_loop_active(
}
});

let telemetry_listener_cancel_token = setup_telemetry_client(
client,
&r.extension_id,
&aws_config.runtime_api,
logs_agent_channel,
event_bus_tx.clone(),
config.serverless_logs_enabled,
aws_config.is_managed_instance_mode(),
)
.await?;
let (telemetry_listener_cancel_token, telemetry_listener, telemetry_listener_handle) =
setup_telemetry_client(
client,
&r.extension_id,
&aws_config.runtime_api,
logs_agent_channel,
config.serverless_logs_enabled,
aws_config.is_managed_instance_mode(),
)
.await?;

let otlp_cancel_token = start_otlp_agent(
config,
Expand Down Expand Up @@ -780,8 +785,43 @@ async fn extension_loop_active(
// Shutdown sequence
debug!("Initiating shutdown sequence");

// Wait for tombstone event from telemetry listener to ensure all events are processed
// This is the result of code refactoring which is shared by OnDemand mode as well.
// Step 1: Cancel TelemetryListener HTTP server
debug!("SHUTDOWN | Step 1: Cancelling TelemetryListener");
telemetry_listener_cancel_token.cancel();

// Step 2: Await TelemetryListener HTTP server to fully shut down
// This ensures all in-flight HTTP requests are completed and logs_tx channel receives all messages
debug!("SHUTDOWN | Step 2: Awaiting TelemetryListener HTTP server shutdown");
if let Err(e) = telemetry_listener_handle.await {
error!("Error waiting for telemetry listener to shut down: {e:?}");
}

// Step 3: Drop TelemetryListener to close logs_tx channel
debug!("SHUTDOWN | Step 3: Dropping TelemetryListener to close logs_tx channel");
drop(telemetry_listener);

// Step 4: Cancel LogsAgent
debug!("SHUTDOWN | Step 4: Cancelling LogsAgent");
logs_agent_cancel_token.cancel();

// Step 5: Await LogsAgent to finish draining all logs from logs_tx channel
debug!("SHUTDOWN | Step 5: Awaiting LogsAgent to drain all logs");
if let Err(e) = logs_agent_handle.await {
error!("Error waiting for logs agent to shut down: {e:?}");
}

// Step 6: Send tombstone event to signal no more telemetry events
debug!("SHUTDOWN | Step 6: Sending tombstone event");
if let Err(e) = event_bus_tx.send(Event::Tombstone).await {
error!("Failed to send tombstone event: {e:?}");
}

// Step 7: Drop event_bus_tx to allow the event_bus channel to close
debug!("SHUTDOWN | Step 7: Dropping event_bus_tx");
drop(event_bus_tx);

// Step 8: Wait for tombstone event to be processed from event_bus
debug!("SHUTDOWN | Step 8: Waiting for tombstone event to be processed");
wait_for_tombstone_event(
&mut event_bus,
&invocation_processor_handle,
Expand All @@ -794,13 +834,14 @@ async fn extension_loop_active(
)
.await;

// Cancel background tasks
// Step 9: Cancel other background services
debug!("SHUTDOWN | Step 9: Cancelling other background services");
cancel_background_services(
api_runtime_proxy_shutdown_signal.as_ref(),
otlp_cancel_token.as_ref(),
&trace_agent_shutdown_token,
&dogstatsd_cancel_token,
&telemetry_listener_cancel_token,
&telemetry_listener_cancel_token, // Already cancelled, but included for completeness
&lifecycle_listener_shutdown_token,
);

Expand Down Expand Up @@ -1023,18 +1064,9 @@ async fn extension_loop_active(
}

if let NextEventResponse::Shutdown { .. } = maybe_shutdown_event {
// Cancel Telemetry API listener
// Important to do this first, so we can receive the Tombstone event which signals
// that there are no more Telemetry events to process
telemetry_listener_cancel_token.cancel();

// Cancel Logs Agent which might have Telemetry API events to process
logs_agent_cancel_token.cancel();
debug!("OnDemand mode: Initiating shutdown sequence");

// Drop the event bus sender to allow the channel to close properly
drop(event_bus_tx);

// Redrive/block on any failed payloads
// Redrive/block on any failed payloads before starting shutdown
let tf = trace_flusher.clone();
pending_flush_handles
.await_flush_handles(
Expand All @@ -1044,7 +1076,44 @@ async fn extension_loop_active(
&proxy_flusher,
)
.await;
// Wait for tombstone event from telemetry listener to ensure all events are processed

// Step 1: Cancel TelemetryListener HTTP server
debug!("SHUTDOWN | Step 1: Cancelling TelemetryListener");
telemetry_listener_cancel_token.cancel();

// Step 2: Await TelemetryListener HTTP server to fully shut down
// This ensures all in-flight HTTP requests are completed and logs_tx channel receives all messages
debug!("SHUTDOWN | Step 2: Awaiting TelemetryListener HTTP server shutdown");
if let Err(e) = telemetry_listener_handle.await {
error!("Error waiting for telemetry listener to shut down: {e:?}");
}

// Step 3: Drop TelemetryListener to close logs_tx channel
debug!("SHUTDOWN | Step 3: Dropping TelemetryListener to close logs_tx channel");
drop(telemetry_listener);

// Step 4: Cancel LogsAgent
debug!("SHUTDOWN | Step 4: Cancelling LogsAgent");
logs_agent_cancel_token.cancel();

// Step 5: Await LogsAgent to finish draining all logs from logs_tx channel
debug!("SHUTDOWN | Step 5: Awaiting LogsAgent to drain all logs");
if let Err(e) = logs_agent_handle.await {
error!("Error waiting for logs agent to shut down: {e:?}");
}

// Step 6: Send tombstone event to signal no more telemetry events
debug!("SHUTDOWN | Step 6: Sending tombstone event");
if let Err(e) = event_bus_tx.send(Event::Tombstone).await {
error!("Failed to send tombstone event: {e:?}");
}

// Step 7: Drop event_bus_tx to allow the event_bus channel to close
debug!("SHUTDOWN | Step 7: Dropping event_bus_tx");
drop(event_bus_tx);

// Step 8: Wait for tombstone event to be processed from event_bus
debug!("SHUTDOWN | Step 8: Waiting for tombstone event to be processed");
wait_for_tombstone_event(
&mut event_bus,
&invocation_processor_handle,
Expand All @@ -1057,13 +1126,14 @@ async fn extension_loop_active(
)
.await;

// Cancel background services
// Step 9: Cancel other background services
debug!("SHUTDOWN | Step 9: Cancelling other background services");
cancel_background_services(
api_runtime_proxy_shutdown_signal.as_ref(),
otlp_cancel_token.as_ref(),
&trace_agent_shutdown_token,
&dogstatsd_cancel_token,
&telemetry_listener_cancel_token,
&telemetry_listener_cancel_token, // Already cancelled, but included for completeness
&lifecycle_listener_shutdown_token,
);

Expand Down Expand Up @@ -1410,6 +1480,7 @@ fn start_logs_agent(
LogsFlusher,
CancellationToken,
LogsAggregatorHandle,
JoinHandle<()>,
) {
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
// Start service in background
Expand All @@ -1425,16 +1496,22 @@ fn start_logs_agent(
is_managed_instance_mode,
);
let cancel_token = agent.cancel_token();
// Start logs agent in background
tokio::spawn(async move {
// Start logs agent in background and return JoinHandle for awaitable completion
let logs_agent_handle = tokio::spawn(async move {
agent.spin().await;

debug!("LOGS_AGENT | Shutting down...");
drop(agent);
});

let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
(tx, flusher, cancel_token, aggregator_handle)
(
tx,
flusher,
cancel_token,
aggregator_handle,
logs_agent_handle,
)
}

#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -1665,22 +1742,13 @@ async fn setup_telemetry_client(
extension_id: &str,
runtime_api: &str,
logs_tx: Sender<TelemetryEvent>,
event_bus_tx: Sender<Event>,
logs_enabled: bool,
managed_instance_mode: bool,
) -> anyhow::Result<CancellationToken> {
let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx, event_bus_tx);
) -> anyhow::Result<(CancellationToken, TelemetryListener, JoinHandle<()>)> {
let listener = TelemetryListener::new(EXTENSION_HOST_IP, TELEMETRY_PORT, logs_tx);

let cancel_token = listener.cancel_token();
match listener.start() {
Ok(()) => {
// Drop the listener, so event_bus_tx is closed
drop(listener);
}
Err(e) => {
error!("Error starting telemetry listener: {e:?}");
}
}
let telemetry_handle = listener.start();

telemetry::subscribe(
client,
Expand All @@ -1693,7 +1761,7 @@ async fn setup_telemetry_client(
.await
.map_err(|e| anyhow::anyhow!("Failed to subscribe to telemetry: {e:?}"))?;

Ok(cancel_token)
Ok((cancel_token, listener, telemetry_handle))
}

fn start_otlp_agent(
Expand Down
Loading
Loading