Skip to content

Commit e892093

Browse files
committed
Fix process callbacks, PID capture, and getLogs race
For background execution, the exit code file was written before output labelers finished writing to the log. Added a marker file that signals when labelers complete, ensuring all output is captured before getLogs reads the log file.
1 parent 392f1ba commit e892093

File tree

7 files changed

+218
-17
lines changed

7 files changed

+218
-17
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@cloudflare/sandbox': patch
3+
---
4+
5+
Fix process callbacks, PID capture, and getLogs race condition for fast commands

packages/sandbox-container/src/services/process-service.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ export class ProcessService {
150150
command,
151151
async (event) => {
152152
// Route events to process record listeners
153-
if (event.type === 'stdout' && event.data) {
153+
if (event.type === 'start' && event.pid !== undefined) {
154+
await this.store.update(processRecord.id, { pid: event.pid });
155+
} else if (event.type === 'stdout' && event.data) {
154156
processRecord.stdout += event.data;
155157
processRecord.outputListeners.forEach((listener) => {
156158
listener('stdout', event.data!);

packages/sandbox-container/src/services/process-store.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,16 @@ export class ProcessStore {
6161
return;
6262
}
6363

64-
const updated = { ...existing, ...data };
65-
this.processes.set(id, updated);
64+
// Mutate in place to preserve reference held by ProcessService for event routing
65+
Object.assign(existing, data);
6666

6767
// Persist terminal states to disk and free memory
6868
const isTerminal = ['completed', 'failed', 'killed', 'error'].includes(
69-
updated.status
69+
existing.status
7070
);
7171
if (isTerminal) {
7272
try {
73-
await this.writeProcessFile(id, updated);
73+
await this.writeProcessFile(id, existing);
7474
} catch (error) {
7575
// Write failed, still delete to prevent memory leak
7676
// Explicit tradeoff: container stability > process history

packages/sandbox-container/src/services/session-manager.ts

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,76 @@ export class SessionManager {
220220

221221
const session = sessionResult.data;
222222

223-
// Get async generator
224223
const generator = session.execStream(command, { commandId, cwd, env });
225224

226-
// CRITICAL: Await first event to ensure command is tracked before returning
227-
// This prevents race condition where killCommand() is called before trackCommand()
225+
/*
226+
* EVENT DRAINING STRATEGY
227+
*
228+
* We process the first 2 events synchronously before returning. Why 2?
229+
*
230+
* Event sequence for commands:
231+
* Fast command (echo 123): start → stdout → complete
232+
* No-output command (true): start → complete
233+
* Long-running (sleep 10): start → ... (events trickle in over time)
234+
*
235+
* - Event 1 is always 'start' (with PID) - must await to track the command
236+
* - Event 2 is typically 'stdout' (first output) or 'complete' (no output)
237+
*
238+
* By processing 2 events, getLogs() returns correct data for fast commands
239+
* like "echo 123" without blocking long-running commands unnecessarily.
240+
* This is a heuristic - commands with multiple stdout chunks may have
241+
* partial output if getLogs() is called immediately after startProcess().
242+
*/
243+
228244
const firstResult = await generator.next();
229245

230-
if (!firstResult.done) {
231-
await onEvent(firstResult.value);
246+
if (firstResult.done) {
247+
return {
248+
success: true,
249+
data: { continueStreaming: Promise.resolve() }
250+
};
251+
}
252+
253+
await onEvent(firstResult.value);
254+
255+
if (
256+
firstResult.value.type === 'complete' ||
257+
firstResult.value.type === 'error'
258+
) {
259+
for await (const event of generator) {
260+
await onEvent(event);
261+
}
262+
return {
263+
success: true,
264+
data: { continueStreaming: Promise.resolve() }
265+
};
266+
}
267+
268+
const secondResult = await generator.next();
269+
270+
if (secondResult.done) {
271+
return {
272+
success: true,
273+
data: { continueStreaming: Promise.resolve() }
274+
};
275+
}
276+
277+
await onEvent(secondResult.value);
278+
279+
if (
280+
secondResult.value.type === 'complete' ||
281+
secondResult.value.type === 'error'
282+
) {
283+
for await (const event of generator) {
284+
await onEvent(event);
285+
}
286+
return {
287+
success: true,
288+
data: { continueStreaming: Promise.resolve() }
289+
};
232290
}
233291

234-
// Create background task for remaining events
292+
// Process still running after 2 events - continue in background
235293
const continueStreaming = (async () => {
236294
try {
237295
for await (const event of generator) {

packages/sandbox-container/src/session.ts

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ export class Session {
310310
const logFile = join(this.sessionDir!, `${commandId}.log`);
311311
const exitCodeFile = join(this.sessionDir!, `${commandId}.exit`);
312312
const pidFile = join(this.sessionDir!, `${commandId}.pid`);
313+
const labelersDoneFile = join(
314+
this.sessionDir!,
315+
`${commandId}.labelers.done`
316+
);
313317

314318
this.logger.info('Streaming command execution started', {
315319
sessionId: this.id,
@@ -340,10 +344,14 @@ export class Session {
340344
throw new Error('Shell stdin is not available');
341345
}
342346

347+
// Wait for PID file to be created (bash script writes it after starting command)
348+
const pid = await this.waitForPidFile(pidFile);
349+
343350
yield {
344351
type: 'start',
345352
timestamp: new Date().toISOString(),
346-
command
353+
command,
354+
pid
347355
};
348356

349357
// Hybrid approach: poll log file until exit code is written
@@ -400,14 +408,41 @@ export class Session {
400408
await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS);
401409
}
402410

403-
// Read final chunks
411+
/*
412+
* Wait for labelers done marker file.
413+
* The exit code file is written by the command subshell, but labelers
414+
* run in parallel background processes. The background monitor creates
415+
* the labelers done file after waiting for labelers to finish.
416+
*/
417+
const maxWaitMs = 5000;
418+
const startWait = Date.now();
419+
let labelersDone = false;
420+
while (Date.now() - startWait < maxWaitMs) {
421+
const doneFile = Bun.file(labelersDoneFile);
422+
if (await doneFile.exists()) {
423+
labelersDone = true;
424+
break;
425+
}
426+
await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS);
427+
}
428+
429+
if (!labelersDone) {
430+
this.logger.warn('Output capture timeout - logs may be incomplete', {
431+
commandId,
432+
sessionId: this.id,
433+
timeoutMs: maxWaitMs
434+
});
435+
}
436+
437+
// Read final chunks from log file after labelers are done
404438
const file = Bun.file(logFile);
405439
if (await file.exists()) {
406-
const content = await file.text();
407-
const newContent = content.slice(position);
440+
const logContent = await file.text();
441+
const finalContent = logContent.slice(position);
408442

409-
if (newContent) {
410-
const lines = newContent.split('\n');
443+
// Process final chunks
444+
if (finalContent) {
445+
const lines = finalContent.split('\n');
411446
for (const line of lines) {
412447
if (!line) continue;
413448

@@ -428,6 +463,13 @@ export class Session {
428463
}
429464
}
430465

466+
// Clean up labelers done file
467+
try {
468+
await rm(labelersDoneFile, { force: true });
469+
} catch {
470+
// Ignore cleanup errors
471+
}
472+
431473
// Parse exit code (already read during polling loop)
432474
const exitCode = parseInt(exitCodeContent, 10);
433475
if (Number.isNaN(exitCode)) {
@@ -629,6 +671,7 @@ export class Session {
629671
const stdoutPipe = join(this.sessionDir!, `${cmdId}.stdout.pipe`);
630672
const stderrPipe = join(this.sessionDir!, `${cmdId}.stderr.pipe`);
631673
const pidFile = join(this.sessionDir!, `${cmdId}.pid`);
674+
const labelersDoneFile = join(this.sessionDir!, `${cmdId}.labelers.done`);
632675

633676
// Escape paths for safe shell usage
634677
const safeStdoutPipe = this.escapeShellPath(stdoutPipe);
@@ -637,6 +680,7 @@ export class Session {
637680
const safeExitCodeFile = this.escapeShellPath(exitCodeFile);
638681
const safeSessionDir = this.escapeShellPath(this.sessionDir!);
639682
const safePidFile = this.escapeShellPath(pidFile);
683+
const safeLabelersDoneFile = this.escapeShellPath(labelersDoneFile);
640684

641685
const indentLines = (input: string, spaces: number) => {
642686
const prefix = ' '.repeat(spaces);
@@ -722,6 +766,7 @@ export class Session {
722766
script += ` (\n`;
723767
script += ` wait "$r1" "$r2" 2>/dev/null\n`;
724768
script += ` rm -f "$sp" "$ep"\n`;
769+
script += ` touch ${safeLabelersDoneFile}\n`;
725770
script += ` ) &\n`;
726771
script += ` # Restore directory immediately\n`;
727772
script += ` cd "$PREV_DIR"\n`;
@@ -745,6 +790,7 @@ export class Session {
745790
script += ` (\n`;
746791
script += ` wait "$r1" "$r2" 2>/dev/null\n`;
747792
script += ` rm -f "$sp" "$ep"\n`;
793+
script += ` touch ${safeLabelersDoneFile}\n`;
748794
script += ` ) &\n`;
749795
}
750796
} else {
@@ -1003,6 +1049,35 @@ export class Session {
10031049
}
10041050
}
10051051

1052+
/**
1053+
* Wait for PID file to be created and return the PID
1054+
* Returns undefined if file doesn't appear within timeout
1055+
*/
1056+
private async waitForPidFile(
1057+
pidFile: string,
1058+
timeoutMs: number = 1000
1059+
): Promise<number | undefined> {
1060+
const startTime = Date.now();
1061+
1062+
while (Date.now() - startTime < timeoutMs) {
1063+
try {
1064+
const file = Bun.file(pidFile);
1065+
if (await file.exists()) {
1066+
const content = await file.text();
1067+
const pid = parseInt(content.trim(), 10);
1068+
if (!Number.isNaN(pid)) {
1069+
return pid;
1070+
}
1071+
}
1072+
} catch {
1073+
// Ignore errors, keep polling
1074+
}
1075+
await Bun.sleep(10); // Poll every 10ms
1076+
}
1077+
1078+
return undefined;
1079+
}
1080+
10061081
/**
10071082
* Escape shell path for safe usage in bash scripts
10081083
*/

packages/sandbox/src/sandbox.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,16 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
12891289
options.onStart(processObj);
12901290
}
12911291

1292+
// Start background streaming if output/exit callbacks are provided
1293+
if (options?.onOutput || options?.onExit) {
1294+
// Fire and forget - don't await, let it run in background
1295+
this.startProcessCallbackStream(response.processId, options).catch(
1296+
() => {
1297+
// Error already handled in startProcessCallbackStream
1298+
}
1299+
);
1300+
}
1301+
12921302
return processObj;
12931303
} catch (error) {
12941304
if (options?.onError && error instanceof Error) {
@@ -1299,6 +1309,56 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
12991309
}
13001310
}
13011311

1312+
/**
1313+
* Start background streaming for process callbacks
1314+
* Opens SSE stream to container and routes events to callbacks
1315+
*/
1316+
private async startProcessCallbackStream(
1317+
processId: string,
1318+
options: ProcessOptions
1319+
): Promise<void> {
1320+
try {
1321+
const stream = await this.client.processes.streamProcessLogs(processId);
1322+
1323+
for await (const event of parseSSEStream<{
1324+
type: string;
1325+
data?: string;
1326+
exitCode?: number;
1327+
processId?: string;
1328+
}>(stream)) {
1329+
switch (event.type) {
1330+
case 'stdout':
1331+
if (event.data && options.onOutput) {
1332+
options.onOutput('stdout', event.data);
1333+
}
1334+
break;
1335+
case 'stderr':
1336+
if (event.data && options.onOutput) {
1337+
options.onOutput('stderr', event.data);
1338+
}
1339+
break;
1340+
case 'exit':
1341+
case 'complete':
1342+
if (options.onExit) {
1343+
options.onExit(event.exitCode ?? null);
1344+
}
1345+
return; // Stream complete
1346+
}
1347+
}
1348+
} catch (error) {
1349+
// Call onError if streaming fails
1350+
if (options.onError && error instanceof Error) {
1351+
options.onError(error);
1352+
}
1353+
// Don't rethrow - background streaming failure shouldn't crash the caller
1354+
this.logger.error(
1355+
'Background process streaming failed',
1356+
error instanceof Error ? error : new Error(String(error)),
1357+
{ processId }
1358+
);
1359+
}
1360+
}
1361+
13021362
async listProcesses(sessionId?: string): Promise<Process[]> {
13031363
const session = sessionId ?? (await this.ensureDefaultSession());
13041364
const response = await this.client.processes.listProcesses();

packages/shared/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ export interface ExecEvent {
209209
result?: ExecResult;
210210
error?: string;
211211
sessionId?: string;
212+
pid?: number; // Present on 'start' event
212213
}
213214

214215
export interface LogEvent {

0 commit comments

Comments
 (0)