Skip to content

Commit 42b76c7

Browse files
committed
fix for streaming
1 parent b9f917c commit 42b76c7

File tree

5 files changed

+64
-4
lines changed

5 files changed

+64
-4
lines changed

cf-sandbox-bug

Lines changed: 0 additions & 1 deletion
This file was deleted.

packages/sandbox/src/sandbox.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,8 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
12271227
// Command execution - delegate to internal session-aware methods
12281228
exec: (command, options) => this.execWithSession(command, sessionId, options),
12291229
execStream: (command, options) => this.execStreamWithSession(command, sessionId, options),
1230+
execStreamWithCallback: (command, onEvent, options) =>
1231+
this.execStreamWithCallback(command, onEvent, { ...options, sessionId }),
12301232

12311233
// Process management
12321234
startProcess: (command, options) => this.startProcess(command, options, sessionId),
@@ -1237,6 +1239,8 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
12371239
cleanupCompletedProcesses: () => this.cleanupCompletedProcesses(),
12381240
getProcessLogs: (id) => this.getProcessLogs(id),
12391241
streamProcessLogs: (processId, options) => this.streamProcessLogs(processId, options),
1242+
streamProcessLogsWithCallback: (processId, onEvent, options) =>
1243+
this.streamProcessLogsWithCallback(processId, onEvent, options),
12401244

12411245
// File operations - pass sessionId via options or parameter
12421246
writeFile: (path, content, options) => this.writeFile(path, content, { ...options, sessionId }),

packages/shared/src/types.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,14 @@ export interface ExecutionSession {
583583
// Command execution
584584
exec(command: string, options?: ExecOptions): Promise<ExecResult>;
585585
execStream(command: string, options?: StreamOptions): Promise<ReadableStream<Uint8Array>>;
586-
586+
587+
// Callback-based streaming
588+
execStreamWithCallback(
589+
command: string,
590+
onEvent: (event: ExecEvent) => void | Promise<void>,
591+
options?: { signal?: AbortSignal }
592+
): Promise<void>;
593+
587594
// Background process management
588595
startProcess(command: string, options?: ProcessOptions): Promise<Process>;
589596
listProcesses(): Promise<Process[]>;
@@ -593,6 +600,11 @@ export interface ExecutionSession {
593600
cleanupCompletedProcesses(): Promise<number>;
594601
getProcessLogs(id: string): Promise<{ stdout: string; stderr: string; processId: string }>;
595602
streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise<ReadableStream<Uint8Array>>;
603+
streamProcessLogsWithCallback(
604+
processId: string,
605+
onEvent: (event: ExecEvent) => void | Promise<void>,
606+
options?: { signal?: AbortSignal }
607+
): Promise<void>;
596608

597609
// File operations
598610
writeFile(path: string, content: string, options?: { encoding?: string }): Promise<WriteFileResult>;

tests/e2e/streaming-operations-workflow.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ describe('Streaming Operations Workflow', () => {
593593

594594
// With callback-based streaming, it should complete successfully
595595
const streamResponse = await vi.waitFor(
596-
async () => fetchWithStartup(`${workerUrl}/api/execStream`, {
596+
async () => fetchWithStartup(`${workerUrl}/api/execStreamCallback`, {
597597
method: 'POST',
598598
headers,
599599
body: JSON.stringify({
@@ -641,7 +641,7 @@ describe('Streaming Operations Workflow', () => {
641641

642642
// This is the exact pattern that was failing before
643643
const streamResponse = await vi.waitFor(
644-
async () => fetchWithStartup(`${workerUrl}/api/execStream`, {
644+
async () => fetchWithStartup(`${workerUrl}/api/execStreamCallback`, {
645645
method: 'POST',
646646
headers,
647647
body: JSON.stringify({

tests/e2e/test-worker/index.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,51 @@ export default {
8686
});
8787
}
8888

89+
// Command execution with callback-based streaming (for long-running commands)
90+
if (url.pathname === '/api/execStreamCallback' && request.method === 'POST') {
91+
console.log('[TestWorker] execStreamCallback called for command:', body.command);
92+
const startTime = Date.now();
93+
94+
// Create a TransformStream to convert callbacks to SSE
95+
const { readable, writable } = new TransformStream();
96+
const writer = writable.getWriter();
97+
const encoder = new TextEncoder();
98+
99+
// Start streaming in the background using callback-based method
100+
(async () => {
101+
try {
102+
await executor.execStreamWithCallback(
103+
body.command,
104+
async (event) => {
105+
// Convert event to SSE format
106+
const sseData = `data: ${JSON.stringify(event)}\n\n`;
107+
await writer.write(encoder.encode(sseData));
108+
}
109+
);
110+
} catch (error: any) {
111+
// Send error event
112+
const errorEvent = {
113+
type: 'error',
114+
timestamp: new Date().toISOString(),
115+
error: error.message,
116+
};
117+
await writer.write(encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`));
118+
} finally {
119+
await writer.close();
120+
console.log('[TestWorker] Stream completed in', Date.now() - startTime, 'ms');
121+
}
122+
})();
123+
124+
// Return SSE stream
125+
return new Response(readable, {
126+
headers: {
127+
'Content-Type': 'text/event-stream',
128+
'Cache-Control': 'no-cache',
129+
'Connection': 'keep-alive',
130+
},
131+
});
132+
}
133+
89134
// Git clone
90135
if (url.pathname === '/api/git/clone' && request.method === 'POST') {
91136
await executor.gitCheckout(body.repoUrl, {

0 commit comments

Comments
 (0)