Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-process-callbacks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@cloudflare/sandbox': patch
---

Fix process callbacks, PID capture, and getLogs race condition for fast commands
2 changes: 2 additions & 0 deletions packages/sandbox-container/src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ export interface ProcessRecord {
sessionId: string;
commandId: string;
};
// Promise that resolves when all streaming events have been processed
streamingComplete?: Promise<void>;
// For isolation layer (file-based IPC)
stdoutFile?: string;
stderrFile?: string;
Expand Down
58 changes: 47 additions & 11 deletions packages/sandbox-container/src/services/process-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ export class ProcessService {
command,
async (event) => {
// Route events to process record listeners
if (event.type === 'stdout' && event.data) {
if (event.type === 'start' && event.pid !== undefined) {
await this.store.update(processRecord.id, { pid: event.pid });
} else if (event.type === 'stdout' && event.data) {
processRecord.stdout += event.data;
processRecord.outputListeners.forEach((listener) => {
listener('stdout', event.data!);
Expand Down Expand Up @@ -214,14 +216,15 @@ export class ProcessService {
return streamResult as ServiceResult<ProcessRecord>;
}

// Command is now tracked and first event processed - safe to return process record
// Continue streaming in background without blocking
streamResult.data.continueStreaming.catch((error) => {
this.logger.error('Failed to execute streaming command', error, {
processId: processRecord.id,
command
// Store streaming promise so getLogs() can await it for completed processes
// This ensures all output is captured before returning logs
processRecord.streamingComplete =
streamResult.data.continueStreaming.catch((error) => {
this.logger.error('Failed to execute streaming command', error, {
processId: processRecord.id,
command
});
});
});

return {
success: true,
Expand Down Expand Up @@ -252,9 +255,9 @@ export class ProcessService {

async getProcess(id: string): Promise<ServiceResult<ProcessRecord>> {
try {
const process = await this.store.get(id);
const processRecord = await this.store.get(id);

if (!process) {
if (!processRecord) {
return {
success: false,
error: {
Expand All @@ -267,9 +270,42 @@ export class ProcessService {
};
}

// Wait for streaming to finish to ensure all output is captured
// We use three indicators to decide whether to wait:
// 1. Terminal status: command has finished, wait for streaming callbacks
// 2. PID check: if process is no longer alive, command finished, wait for streaming
// 3. No streamingComplete: process was read from disk, output is complete
//
// For long-running processes (servers), PID is alive and status is 'running',
// so we return current output without blocking.
if (processRecord.streamingComplete) {
const isTerminal = ['completed', 'failed', 'killed', 'error'].includes(
processRecord.status
);

// Check if the subprocess is still alive (deterministic check for fast commands)
// If PID is set and subprocess is dead, the command has finished
let commandFinished = false;
if (processRecord.pid !== undefined) {
try {
// Signal 0 doesn't actually send a signal, just checks if process exists
process.kill(processRecord.pid, 0);
// Subprocess is still running
} catch {
// Subprocess is not running (either finished or doesn't exist)
commandFinished = true;
}
}

// Wait if status is terminal OR command has finished (for fast commands)
if (isTerminal || commandFinished) {
await processRecord.streamingComplete;
}
}

return {
success: true,
data: process
data: processRecord
};
} catch (error) {
const errorMessage =
Expand Down
8 changes: 4 additions & 4 deletions packages/sandbox-container/src/services/process-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ export class ProcessStore {
return;
}

const updated = { ...existing, ...data };
this.processes.set(id, updated);
// Mutate in place to preserve reference held by ProcessService for event routing
Object.assign(existing, data);

// Persist terminal states to disk and free memory
const isTerminal = ['completed', 'failed', 'killed', 'error'].includes(
updated.status
existing.status
);
if (isTerminal) {
try {
await this.writeProcessFile(id, updated);
await this.writeProcessFile(id, existing);
} catch (error) {
// Write failed, still delete to prevent memory leak
// Explicit tradeoff: container stability > process history
Expand Down
32 changes: 26 additions & 6 deletions packages/sandbox-container/src/services/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,38 @@ export class SessionManager {

const session = sessionResult.data;

// Get async generator
const generator = session.execStream(command, { commandId, cwd, env });

// CRITICAL: Await first event to ensure command is tracked before returning
// This prevents race condition where killCommand() is called before trackCommand()
// Process 'start' event synchronously to capture PID before returning
// All other events stream in background via continueStreaming promise
// getLogs() awaits continueStreaming for completed processes to ensure
// all output is captured (deterministic, no timing heuristics)
const firstResult = await generator.next();

if (!firstResult.done) {
await onEvent(firstResult.value);
if (firstResult.done) {
return {
success: true,
data: { continueStreaming: Promise.resolve() }
};
}

await onEvent(firstResult.value);

// If already complete/error, drain remaining events synchronously
if (
firstResult.value.type === 'complete' ||
firstResult.value.type === 'error'
) {
for await (const event of generator) {
await onEvent(event);
}
return {
success: true,
data: { continueStreaming: Promise.resolve() }
};
}

// Create background task for remaining events
// Continue streaming remaining events in background
const continueStreaming = (async () => {
try {
for await (const event of generator) {
Expand Down
95 changes: 89 additions & 6 deletions packages/sandbox-container/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ export class Session {
const logFile = join(this.sessionDir!, `${commandId}.log`);
const exitCodeFile = join(this.sessionDir!, `${commandId}.exit`);
const pidFile = join(this.sessionDir!, `${commandId}.pid`);
const labelersDoneFile = join(
this.sessionDir!,
`${commandId}.labelers.done`
);

this.logger.info('Streaming command execution started', {
sessionId: this.id,
Expand Down Expand Up @@ -340,10 +344,22 @@ export class Session {
throw new Error('Shell stdin is not available');
}

// Wait for PID file to be created (bash script writes it after starting command)
const pid = await this.waitForPidFile(pidFile);

if (pid === undefined) {
this.logger.warn('PID file not created within timeout', {
sessionId: this.id,
commandId,
pidFile
});
}

yield {
type: 'start',
timestamp: new Date().toISOString(),
command
command,
pid
};

// Hybrid approach: poll log file until exit code is written
Expand Down Expand Up @@ -400,14 +416,41 @@ export class Session {
await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS);
}

// Read final chunks
/*
* Wait for labelers done marker file.
* The exit code file is written by the command subshell, but labelers
* run in parallel background processes. The background monitor creates
* the labelers done file after waiting for labelers to finish.
*/
const maxWaitMs = 5000;
const startWait = Date.now();
let labelersDone = false;
while (Date.now() - startWait < maxWaitMs) {
const doneFile = Bun.file(labelersDoneFile);
if (await doneFile.exists()) {
labelersDone = true;
break;
}
await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS);
}

if (!labelersDone) {
this.logger.warn('Output capture timeout - logs may be incomplete', {
commandId,
sessionId: this.id,
timeoutMs: maxWaitMs
});
}

// Read final chunks from log file after labelers are done
const file = Bun.file(logFile);
if (await file.exists()) {
const content = await file.text();
const newContent = content.slice(position);
const logContent = await file.text();
const finalContent = logContent.slice(position);

if (newContent) {
const lines = newContent.split('\n');
// Process final chunks
if (finalContent) {
const lines = finalContent.split('\n');
for (const line of lines) {
if (!line) continue;

Expand All @@ -428,6 +471,13 @@ export class Session {
}
}

// Clean up labelers done file
try {
await rm(labelersDoneFile, { force: true });
} catch {
// Ignore cleanup errors
}

// Parse exit code (already read during polling loop)
const exitCode = parseInt(exitCodeContent, 10);
if (Number.isNaN(exitCode)) {
Expand Down Expand Up @@ -629,6 +679,7 @@ export class Session {
const stdoutPipe = join(this.sessionDir!, `${cmdId}.stdout.pipe`);
const stderrPipe = join(this.sessionDir!, `${cmdId}.stderr.pipe`);
const pidFile = join(this.sessionDir!, `${cmdId}.pid`);
const labelersDoneFile = join(this.sessionDir!, `${cmdId}.labelers.done`);

// Escape paths for safe shell usage
const safeStdoutPipe = this.escapeShellPath(stdoutPipe);
Expand All @@ -637,6 +688,7 @@ export class Session {
const safeExitCodeFile = this.escapeShellPath(exitCodeFile);
const safeSessionDir = this.escapeShellPath(this.sessionDir!);
const safePidFile = this.escapeShellPath(pidFile);
const safeLabelersDoneFile = this.escapeShellPath(labelersDoneFile);

const indentLines = (input: string, spaces: number) => {
const prefix = ' '.repeat(spaces);
Expand Down Expand Up @@ -722,6 +774,7 @@ export class Session {
script += ` (\n`;
script += ` wait "$r1" "$r2" 2>/dev/null\n`;
script += ` rm -f "$sp" "$ep"\n`;
script += ` touch ${safeLabelersDoneFile}\n`;
script += ` ) &\n`;
script += ` # Restore directory immediately\n`;
script += ` cd "$PREV_DIR"\n`;
Expand All @@ -745,6 +798,7 @@ export class Session {
script += ` (\n`;
script += ` wait "$r1" "$r2" 2>/dev/null\n`;
script += ` rm -f "$sp" "$ep"\n`;
script += ` touch ${safeLabelersDoneFile}\n`;
script += ` ) &\n`;
}
} else {
Expand Down Expand Up @@ -1003,6 +1057,35 @@ export class Session {
}
}

/**
* Wait for PID file to be created and return the PID
* Returns undefined if file doesn't appear within timeout
*/
private async waitForPidFile(
pidFile: string,
timeoutMs: number = 1000
): Promise<number | undefined> {
const startTime = Date.now();

while (Date.now() - startTime < timeoutMs) {
try {
const file = Bun.file(pidFile);
if (await file.exists()) {
const content = await file.text();
const pid = parseInt(content.trim(), 10);
if (!Number.isNaN(pid)) {
return pid;
}
}
} catch {
// Ignore errors, keep polling
}
await Bun.sleep(10); // Poll every 10ms
}

return undefined;
}

/**
* Escape shell path for safe usage in bash scripts
*/
Expand Down
Loading
Loading