Skip to content

Commit 204f9ac

Browse files
Fix process callbacks, PID capture, and getLogs race (#267)
* Fix process callbacks, PID capture, and getLogs race For background execution, the exit code file was written before output labelers finished writing to the log. Added a marker file that signals when labelers complete, ensuring all output is captured before getLogs reads the log file. * Add E2E test for PID and immediate logs * Use PID check to await streaming for completed commands For fast commands like 'echo', getLogs() needs complete output without artificial delays. For long-running processes like servers, it should return current output without blocking. The subprocess PID provides a deterministic way to distinguish these cases: dead PID means command finished and we should wait for streaming callbacks to complete. Live PID means command is still running and we return immediately. * Add warning log when PID file not created within timeout
1 parent 451ca39 commit 204f9ac

File tree

9 files changed

+240
-32
lines changed

9 files changed

+240
-32
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@cloudflare/sandbox': patch
3+
---
4+
5+
Fix process callbacks, PID capture, and getLogs race condition for fast commands

packages/sandbox-container/src/core/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ export interface ProcessRecord {
246246
sessionId: string;
247247
commandId: string;
248248
};
249+
// Promise that resolves when all streaming events have been processed
250+
streamingComplete?: Promise<void>;
249251
// For isolation layer (file-based IPC)
250252
stdoutFile?: string;
251253
stderrFile?: string;

packages/sandbox-container/src/services/process-service.ts

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ export class ProcessService {
150150
command,
151151
async (event) => {
152152
// Route events to process record listeners
153-
if (event.type === 'stdout' && event.data) {
153+
if (event.type === 'start' && event.pid !== undefined) {
154+
await this.store.update(processRecord.id, { pid: event.pid });
155+
} else if (event.type === 'stdout' && event.data) {
154156
processRecord.stdout += event.data;
155157
processRecord.outputListeners.forEach((listener) => {
156158
listener('stdout', event.data!);
@@ -214,14 +216,15 @@ export class ProcessService {
214216
return streamResult as ServiceResult<ProcessRecord>;
215217
}
216218

217-
// Command is now tracked and first event processed - safe to return process record
218-
// Continue streaming in background without blocking
219-
streamResult.data.continueStreaming.catch((error) => {
220-
this.logger.error('Failed to execute streaming command', error, {
221-
processId: processRecord.id,
222-
command
219+
// Store streaming promise so getLogs() can await it for completed processes
220+
// This ensures all output is captured before returning logs
221+
processRecord.streamingComplete =
222+
streamResult.data.continueStreaming.catch((error) => {
223+
this.logger.error('Failed to execute streaming command', error, {
224+
processId: processRecord.id,
225+
command
226+
});
223227
});
224-
});
225228

226229
return {
227230
success: true,
@@ -252,9 +255,9 @@ export class ProcessService {
252255

253256
async getProcess(id: string): Promise<ServiceResult<ProcessRecord>> {
254257
try {
255-
const process = await this.store.get(id);
258+
const processRecord = await this.store.get(id);
256259

257-
if (!process) {
260+
if (!processRecord) {
258261
return {
259262
success: false,
260263
error: {
@@ -267,9 +270,42 @@ export class ProcessService {
267270
};
268271
}
269272

273+
// Wait for streaming to finish to ensure all output is captured
274+
// We use three indicators to decide whether to wait:
275+
// 1. Terminal status: command has finished, wait for streaming callbacks
276+
// 2. PID check: if process is no longer alive, command finished, wait for streaming
277+
// 3. No streamingComplete: process was read from disk, output is complete
278+
//
279+
// For long-running processes (servers), PID is alive and status is 'running',
280+
// so we return current output without blocking.
281+
if (processRecord.streamingComplete) {
282+
const isTerminal = ['completed', 'failed', 'killed', 'error'].includes(
283+
processRecord.status
284+
);
285+
286+
// Check if the subprocess is still alive (deterministic check for fast commands)
287+
// If PID is set and subprocess is dead, the command has finished
288+
let commandFinished = false;
289+
if (processRecord.pid !== undefined) {
290+
try {
291+
// Signal 0 doesn't actually send a signal, just checks if process exists
292+
process.kill(processRecord.pid, 0);
293+
// Subprocess is still running
294+
} catch {
295+
// Subprocess is not running (either finished or doesn't exist)
296+
commandFinished = true;
297+
}
298+
}
299+
300+
// Wait if status is terminal OR command has finished (for fast commands)
301+
if (isTerminal || commandFinished) {
302+
await processRecord.streamingComplete;
303+
}
304+
}
305+
270306
return {
271307
success: true,
272-
data: process
308+
data: processRecord
273309
};
274310
} catch (error) {
275311
const errorMessage =

packages/sandbox-container/src/services/process-store.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,16 @@ export class ProcessStore {
6161
return;
6262
}
6363

64-
const updated = { ...existing, ...data };
65-
this.processes.set(id, updated);
64+
// Mutate in place to preserve reference held by ProcessService for event routing
65+
Object.assign(existing, data);
6666

6767
// Persist terminal states to disk and free memory
6868
const isTerminal = ['completed', 'failed', 'killed', 'error'].includes(
69-
updated.status
69+
existing.status
7070
);
7171
if (isTerminal) {
7272
try {
73-
await this.writeProcessFile(id, updated);
73+
await this.writeProcessFile(id, existing);
7474
} catch (error) {
7575
// Write failed, still delete to prevent memory leak
7676
// Explicit tradeoff: container stability > process history

packages/sandbox-container/src/services/session-manager.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,38 @@ export class SessionManager {
220220

221221
const session = sessionResult.data;
222222

223-
// Get async generator
224223
const generator = session.execStream(command, { commandId, cwd, env });
225224

226-
// CRITICAL: Await first event to ensure command is tracked before returning
227-
// This prevents race condition where killCommand() is called before trackCommand()
225+
// Process 'start' event synchronously to capture PID before returning
226+
// All other events stream in background via continueStreaming promise
227+
// getLogs() awaits continueStreaming for completed processes to ensure
228+
// all output is captured (deterministic, no timing heuristics)
228229
const firstResult = await generator.next();
229230

230-
if (!firstResult.done) {
231-
await onEvent(firstResult.value);
231+
if (firstResult.done) {
232+
return {
233+
success: true,
234+
data: { continueStreaming: Promise.resolve() }
235+
};
236+
}
237+
238+
await onEvent(firstResult.value);
239+
240+
// If already complete/error, drain remaining events synchronously
241+
if (
242+
firstResult.value.type === 'complete' ||
243+
firstResult.value.type === 'error'
244+
) {
245+
for await (const event of generator) {
246+
await onEvent(event);
247+
}
248+
return {
249+
success: true,
250+
data: { continueStreaming: Promise.resolve() }
251+
};
232252
}
233253

234-
// Create background task for remaining events
254+
// Continue streaming remaining events in background
235255
const continueStreaming = (async () => {
236256
try {
237257
for await (const event of generator) {

packages/sandbox-container/src/session.ts

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ export class Session {
310310
const logFile = join(this.sessionDir!, `${commandId}.log`);
311311
const exitCodeFile = join(this.sessionDir!, `${commandId}.exit`);
312312
const pidFile = join(this.sessionDir!, `${commandId}.pid`);
313+
const labelersDoneFile = join(
314+
this.sessionDir!,
315+
`${commandId}.labelers.done`
316+
);
313317

314318
this.logger.info('Streaming command execution started', {
315319
sessionId: this.id,
@@ -340,10 +344,22 @@ export class Session {
340344
throw new Error('Shell stdin is not available');
341345
}
342346

347+
// Wait for PID file to be created (bash script writes it after starting command)
348+
const pid = await this.waitForPidFile(pidFile);
349+
350+
if (pid === undefined) {
351+
this.logger.warn('PID file not created within timeout', {
352+
sessionId: this.id,
353+
commandId,
354+
pidFile
355+
});
356+
}
357+
343358
yield {
344359
type: 'start',
345360
timestamp: new Date().toISOString(),
346-
command
361+
command,
362+
pid
347363
};
348364

349365
// Hybrid approach: poll log file until exit code is written
@@ -400,14 +416,41 @@ export class Session {
400416
await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS);
401417
}
402418

403-
// Read final chunks
419+
/*
420+
* Wait for labelers done marker file.
421+
* The exit code file is written by the command subshell, but labelers
422+
* run in parallel background processes. The background monitor creates
423+
* the labelers done file after waiting for labelers to finish.
424+
*/
425+
const maxWaitMs = 5000;
426+
const startWait = Date.now();
427+
let labelersDone = false;
428+
while (Date.now() - startWait < maxWaitMs) {
429+
const doneFile = Bun.file(labelersDoneFile);
430+
if (await doneFile.exists()) {
431+
labelersDone = true;
432+
break;
433+
}
434+
await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS);
435+
}
436+
437+
if (!labelersDone) {
438+
this.logger.warn('Output capture timeout - logs may be incomplete', {
439+
commandId,
440+
sessionId: this.id,
441+
timeoutMs: maxWaitMs
442+
});
443+
}
444+
445+
// Read final chunks from log file after labelers are done
404446
const file = Bun.file(logFile);
405447
if (await file.exists()) {
406-
const content = await file.text();
407-
const newContent = content.slice(position);
448+
const logContent = await file.text();
449+
const finalContent = logContent.slice(position);
408450

409-
if (newContent) {
410-
const lines = newContent.split('\n');
451+
// Process final chunks
452+
if (finalContent) {
453+
const lines = finalContent.split('\n');
411454
for (const line of lines) {
412455
if (!line) continue;
413456

@@ -428,6 +471,13 @@ export class Session {
428471
}
429472
}
430473

474+
// Clean up labelers done file
475+
try {
476+
await rm(labelersDoneFile, { force: true });
477+
} catch {
478+
// Ignore cleanup errors
479+
}
480+
431481
// Parse exit code (already read during polling loop)
432482
const exitCode = parseInt(exitCodeContent, 10);
433483
if (Number.isNaN(exitCode)) {
@@ -629,6 +679,7 @@ export class Session {
629679
const stdoutPipe = join(this.sessionDir!, `${cmdId}.stdout.pipe`);
630680
const stderrPipe = join(this.sessionDir!, `${cmdId}.stderr.pipe`);
631681
const pidFile = join(this.sessionDir!, `${cmdId}.pid`);
682+
const labelersDoneFile = join(this.sessionDir!, `${cmdId}.labelers.done`);
632683

633684
// Escape paths for safe shell usage
634685
const safeStdoutPipe = this.escapeShellPath(stdoutPipe);
@@ -637,6 +688,7 @@ export class Session {
637688
const safeExitCodeFile = this.escapeShellPath(exitCodeFile);
638689
const safeSessionDir = this.escapeShellPath(this.sessionDir!);
639690
const safePidFile = this.escapeShellPath(pidFile);
691+
const safeLabelersDoneFile = this.escapeShellPath(labelersDoneFile);
640692

641693
const indentLines = (input: string, spaces: number) => {
642694
const prefix = ' '.repeat(spaces);
@@ -722,6 +774,7 @@ export class Session {
722774
script += ` (\n`;
723775
script += ` wait "$r1" "$r2" 2>/dev/null\n`;
724776
script += ` rm -f "$sp" "$ep"\n`;
777+
script += ` touch ${safeLabelersDoneFile}\n`;
725778
script += ` ) &\n`;
726779
script += ` # Restore directory immediately\n`;
727780
script += ` cd "$PREV_DIR"\n`;
@@ -745,6 +798,7 @@ export class Session {
745798
script += ` (\n`;
746799
script += ` wait "$r1" "$r2" 2>/dev/null\n`;
747800
script += ` rm -f "$sp" "$ep"\n`;
801+
script += ` touch ${safeLabelersDoneFile}\n`;
748802
script += ` ) &\n`;
749803
}
750804
} else {
@@ -1003,6 +1057,35 @@ export class Session {
10031057
}
10041058
}
10051059

1060+
/**
1061+
* Wait for PID file to be created and return the PID
1062+
* Returns undefined if file doesn't appear within timeout
1063+
*/
1064+
private async waitForPidFile(
1065+
pidFile: string,
1066+
timeoutMs: number = 1000
1067+
): Promise<number | undefined> {
1068+
const startTime = Date.now();
1069+
1070+
while (Date.now() - startTime < timeoutMs) {
1071+
try {
1072+
const file = Bun.file(pidFile);
1073+
if (await file.exists()) {
1074+
const content = await file.text();
1075+
const pid = parseInt(content.trim(), 10);
1076+
if (!Number.isNaN(pid)) {
1077+
return pid;
1078+
}
1079+
}
1080+
} catch {
1081+
// Ignore errors, keep polling
1082+
}
1083+
await Bun.sleep(10); // Poll every 10ms
1084+
}
1085+
1086+
return undefined;
1087+
}
1088+
10061089
/**
10071090
* Escape shell path for safe usage in bash scripts
10081091
*/

0 commit comments

Comments
 (0)