diff --git a/.changeset/fix-process-callbacks.md b/.changeset/fix-process-callbacks.md new file mode 100644 index 00000000..0f1f9b77 --- /dev/null +++ b/.changeset/fix-process-callbacks.md @@ -0,0 +1,5 @@ +--- +'@cloudflare/sandbox': patch +--- + +Fix process callbacks, PID capture, and getLogs race condition for fast commands diff --git a/packages/sandbox-container/src/core/types.ts b/packages/sandbox-container/src/core/types.ts index 0a63891d..69396192 100644 --- a/packages/sandbox-container/src/core/types.ts +++ b/packages/sandbox-container/src/core/types.ts @@ -246,6 +246,8 @@ export interface ProcessRecord { sessionId: string; commandId: string; }; + // Promise that resolves when all streaming events have been processed + streamingComplete?: Promise; // For isolation layer (file-based IPC) stdoutFile?: string; stderrFile?: string; diff --git a/packages/sandbox-container/src/services/process-service.ts b/packages/sandbox-container/src/services/process-service.ts index 3f0fb956..b4123d5b 100644 --- a/packages/sandbox-container/src/services/process-service.ts +++ b/packages/sandbox-container/src/services/process-service.ts @@ -150,7 +150,9 @@ export class ProcessService { command, async (event) => { // Route events to process record listeners - if (event.type === 'stdout' && event.data) { + if (event.type === 'start' && event.pid !== undefined) { + await this.store.update(processRecord.id, { pid: event.pid }); + } else if (event.type === 'stdout' && event.data) { processRecord.stdout += event.data; processRecord.outputListeners.forEach((listener) => { listener('stdout', event.data!); @@ -214,14 +216,15 @@ export class ProcessService { return streamResult as ServiceResult; } - // Command is now tracked and first event processed - safe to return process record - // Continue streaming in background without blocking - streamResult.data.continueStreaming.catch((error) => { - this.logger.error('Failed to execute streaming command', error, { - processId: processRecord.id, - command + // Store streaming promise so getLogs() can await it for completed processes + // This ensures all output is captured before returning logs + processRecord.streamingComplete = + streamResult.data.continueStreaming.catch((error) => { + this.logger.error('Failed to execute streaming command', error, { + processId: processRecord.id, + command + }); }); - }); return { success: true, @@ -252,9 +255,9 @@ export class ProcessService { async getProcess(id: string): Promise> { try { - const process = await this.store.get(id); + const processRecord = await this.store.get(id); - if (!process) { + if (!processRecord) { return { success: false, error: { @@ -267,9 +270,42 @@ export class ProcessService { }; } + // Wait for streaming to finish to ensure all output is captured + // We use three indicators to decide whether to wait: + // 1. Terminal status: command has finished, wait for streaming callbacks + // 2. PID check: if process is no longer alive, command finished, wait for streaming + // 3. No streamingComplete: process was read from disk, output is complete + // + // For long-running processes (servers), PID is alive and status is 'running', + // so we return current output without blocking. + if (processRecord.streamingComplete) { + const isTerminal = ['completed', 'failed', 'killed', 'error'].includes( + processRecord.status + ); + + // Check if the subprocess is still alive (deterministic check for fast commands) + // If PID is set and subprocess is dead, the command has finished + let commandFinished = false; + if (processRecord.pid !== undefined) { + try { + // Signal 0 doesn't actually send a signal, just checks if process exists + process.kill(processRecord.pid, 0); + // Subprocess is still running + } catch { + // Subprocess is not running (either finished or doesn't exist) + commandFinished = true; + } + } + + // Wait if status is terminal OR command has finished (for fast commands) + if (isTerminal || commandFinished) { + await processRecord.streamingComplete; + } + } + return { success: true, - data: process + data: processRecord }; } catch (error) { const errorMessage = diff --git a/packages/sandbox-container/src/services/process-store.ts b/packages/sandbox-container/src/services/process-store.ts index 3a4620a8..f197965c 100644 --- a/packages/sandbox-container/src/services/process-store.ts +++ b/packages/sandbox-container/src/services/process-store.ts @@ -61,16 +61,16 @@ export class ProcessStore { return; } - const updated = { ...existing, ...data }; - this.processes.set(id, updated); + // Mutate in place to preserve reference held by ProcessService for event routing + Object.assign(existing, data); // Persist terminal states to disk and free memory const isTerminal = ['completed', 'failed', 'killed', 'error'].includes( - updated.status + existing.status ); if (isTerminal) { try { - await this.writeProcessFile(id, updated); + await this.writeProcessFile(id, existing); } catch (error) { // Write failed, still delete to prevent memory leak // Explicit tradeoff: container stability > process history diff --git a/packages/sandbox-container/src/services/session-manager.ts b/packages/sandbox-container/src/services/session-manager.ts index 4fbd2150..7c509dab 100644 --- a/packages/sandbox-container/src/services/session-manager.ts +++ b/packages/sandbox-container/src/services/session-manager.ts @@ -220,18 +220,38 @@ export class SessionManager { const session = sessionResult.data; - // Get async generator const generator = session.execStream(command, { commandId, cwd, env }); - // CRITICAL: Await first event to ensure command is tracked before returning - // This prevents race condition where killCommand() is called before trackCommand() + // Process 'start' event synchronously to capture PID before returning + // All other events stream in background via continueStreaming promise + // getLogs() awaits continueStreaming for completed processes to ensure + // all output is captured (deterministic, no timing heuristics) const firstResult = await generator.next(); - if (!firstResult.done) { - await onEvent(firstResult.value); + if (firstResult.done) { + return { + success: true, + data: { continueStreaming: Promise.resolve() } + }; + } + + await onEvent(firstResult.value); + + // If already complete/error, drain remaining events synchronously + if ( + firstResult.value.type === 'complete' || + firstResult.value.type === 'error' + ) { + for await (const event of generator) { + await onEvent(event); + } + return { + success: true, + data: { continueStreaming: Promise.resolve() } + }; } - // Create background task for remaining events + // Continue streaming remaining events in background const continueStreaming = (async () => { try { for await (const event of generator) { diff --git a/packages/sandbox-container/src/session.ts b/packages/sandbox-container/src/session.ts index cc9328f2..b5c58f03 100644 --- a/packages/sandbox-container/src/session.ts +++ b/packages/sandbox-container/src/session.ts @@ -310,6 +310,10 @@ export class Session { const logFile = join(this.sessionDir!, `${commandId}.log`); const exitCodeFile = join(this.sessionDir!, `${commandId}.exit`); const pidFile = join(this.sessionDir!, `${commandId}.pid`); + const labelersDoneFile = join( + this.sessionDir!, + `${commandId}.labelers.done` + ); this.logger.info('Streaming command execution started', { sessionId: this.id, @@ -340,10 +344,22 @@ export class Session { throw new Error('Shell stdin is not available'); } + // Wait for PID file to be created (bash script writes it after starting command) + const pid = await this.waitForPidFile(pidFile); + + if (pid === undefined) { + this.logger.warn('PID file not created within timeout', { + sessionId: this.id, + commandId, + pidFile + }); + } + yield { type: 'start', timestamp: new Date().toISOString(), - command + command, + pid }; // Hybrid approach: poll log file until exit code is written @@ -400,14 +416,41 @@ export class Session { await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS); } - // Read final chunks + /* + * Wait for labelers done marker file. + * The exit code file is written by the command subshell, but labelers + * run in parallel background processes. The background monitor creates + * the labelers done file after waiting for labelers to finish. + */ + const maxWaitMs = 5000; + const startWait = Date.now(); + let labelersDone = false; + while (Date.now() - startWait < maxWaitMs) { + const doneFile = Bun.file(labelersDoneFile); + if (await doneFile.exists()) { + labelersDone = true; + break; + } + await Bun.sleep(CONFIG.STREAM_CHUNK_DELAY_MS); + } + + if (!labelersDone) { + this.logger.warn('Output capture timeout - logs may be incomplete', { + commandId, + sessionId: this.id, + timeoutMs: maxWaitMs + }); + } + + // Read final chunks from log file after labelers are done const file = Bun.file(logFile); if (await file.exists()) { - const content = await file.text(); - const newContent = content.slice(position); + const logContent = await file.text(); + const finalContent = logContent.slice(position); - if (newContent) { - const lines = newContent.split('\n'); + // Process final chunks + if (finalContent) { + const lines = finalContent.split('\n'); for (const line of lines) { if (!line) continue; @@ -428,6 +471,13 @@ export class Session { } } + // Clean up labelers done file + try { + await rm(labelersDoneFile, { force: true }); + } catch { + // Ignore cleanup errors + } + // Parse exit code (already read during polling loop) const exitCode = parseInt(exitCodeContent, 10); if (Number.isNaN(exitCode)) { @@ -629,6 +679,7 @@ export class Session { const stdoutPipe = join(this.sessionDir!, `${cmdId}.stdout.pipe`); const stderrPipe = join(this.sessionDir!, `${cmdId}.stderr.pipe`); const pidFile = join(this.sessionDir!, `${cmdId}.pid`); + const labelersDoneFile = join(this.sessionDir!, `${cmdId}.labelers.done`); // Escape paths for safe shell usage const safeStdoutPipe = this.escapeShellPath(stdoutPipe); @@ -637,6 +688,7 @@ export class Session { const safeExitCodeFile = this.escapeShellPath(exitCodeFile); const safeSessionDir = this.escapeShellPath(this.sessionDir!); const safePidFile = this.escapeShellPath(pidFile); + const safeLabelersDoneFile = this.escapeShellPath(labelersDoneFile); const indentLines = (input: string, spaces: number) => { const prefix = ' '.repeat(spaces); @@ -722,6 +774,7 @@ export class Session { script += ` (\n`; script += ` wait "$r1" "$r2" 2>/dev/null\n`; script += ` rm -f "$sp" "$ep"\n`; + script += ` touch ${safeLabelersDoneFile}\n`; script += ` ) &\n`; script += ` # Restore directory immediately\n`; script += ` cd "$PREV_DIR"\n`; @@ -745,6 +798,7 @@ export class Session { script += ` (\n`; script += ` wait "$r1" "$r2" 2>/dev/null\n`; script += ` rm -f "$sp" "$ep"\n`; + script += ` touch ${safeLabelersDoneFile}\n`; script += ` ) &\n`; } } else { @@ -1003,6 +1057,35 @@ export class Session { } } + /** + * Wait for PID file to be created and return the PID + * Returns undefined if file doesn't appear within timeout + */ + private async waitForPidFile( + pidFile: string, + timeoutMs: number = 1000 + ): Promise { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + try { + const file = Bun.file(pidFile); + if (await file.exists()) { + const content = await file.text(); + const pid = parseInt(content.trim(), 10); + if (!Number.isNaN(pid)) { + return pid; + } + } + } catch { + // Ignore errors, keep polling + } + await Bun.sleep(10); // Poll every 10ms + } + + return undefined; + } + /** * Escape shell path for safe usage in bash scripts */ diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index b5014691..1c95c18b 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -1289,6 +1289,16 @@ export class Sandbox extends Container implements ISandbox { options.onStart(processObj); } + // Start background streaming if output/exit callbacks are provided + if (options?.onOutput || options?.onExit) { + // Fire and forget - don't await, let it run in background + this.startProcessCallbackStream(response.processId, options).catch( + () => { + // Error already handled in startProcessCallbackStream + } + ); + } + return processObj; } catch (error) { if (options?.onError && error instanceof Error) { @@ -1299,6 +1309,56 @@ export class Sandbox extends Container implements ISandbox { } } + /** + * Start background streaming for process callbacks + * Opens SSE stream to container and routes events to callbacks + */ + private async startProcessCallbackStream( + processId: string, + options: ProcessOptions + ): Promise { + try { + const stream = await this.client.processes.streamProcessLogs(processId); + + for await (const event of parseSSEStream<{ + type: string; + data?: string; + exitCode?: number; + processId?: string; + }>(stream)) { + switch (event.type) { + case 'stdout': + if (event.data && options.onOutput) { + options.onOutput('stdout', event.data); + } + break; + case 'stderr': + if (event.data && options.onOutput) { + options.onOutput('stderr', event.data); + } + break; + case 'exit': + case 'complete': + if (options.onExit) { + options.onExit(event.exitCode ?? null); + } + return; // Stream complete + } + } + } catch (error) { + // Call onError if streaming fails + if (options.onError && error instanceof Error) { + options.onError(error); + } + // Don't rethrow - background streaming failure shouldn't crash the caller + this.logger.error( + 'Background process streaming failed', + error instanceof Error ? error : new Error(String(error)), + { processId } + ); + } + } + async listProcesses(sessionId?: string): Promise { const session = sessionId ?? (await this.ensureDefaultSession()); const response = await this.client.processes.listProcesses(); diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 50238211..5dc0bb71 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -209,6 +209,7 @@ export interface ExecEvent { result?: ExecResult; error?: string; sessionId?: string; + pid?: number; // Present on 'start' event } export interface LogEvent { diff --git a/tests/e2e/process-lifecycle-workflow.test.ts b/tests/e2e/process-lifecycle-workflow.test.ts index 866660f0..a8a02a01 100644 --- a/tests/e2e/process-lifecycle-workflow.test.ts +++ b/tests/e2e/process-lifecycle-workflow.test.ts @@ -225,11 +225,10 @@ describe('Process Lifecycle Workflow', () => { }); }, 90000); - test('should get process logs after execution', async () => { + test('should capture PID and logs immediately for fast commands', async () => { const sandboxId = createSandboxId(); const headers = createTestHeaders(sandboxId); - // Start a process that outputs to stdout const startResponse = await fetch(`${workerUrl}/api/process/start`, { method: 'POST', headers, @@ -238,13 +237,15 @@ describe('Process Lifecycle Workflow', () => { }) }); + expect(startResponse.status).toBe(200); const startData = (await startResponse.json()) as Process; const processId = startData.id; - // Wait for process to complete - await new Promise((resolve) => setTimeout(resolve, 2000)); + // PID should be available immediately + expect(startData.pid).toBeDefined(); + expect(typeof startData.pid).toBe('number'); - // Get process logs + // Logs should be available immediately for fast commands const logsResponse = await fetch( `${workerUrl}/api/process/${processId}/logs`, {