Skip to content

Commit b9f917c

Browse files
committed
test long running process
1 parent 24dbbbe commit b9f917c

File tree

5 files changed

+496
-1
lines changed

5 files changed

+496
-1
lines changed

cf-sandbox-bug

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Subproject commit 704b93c6976ebe52c1fbf4d4fec52bb545f4e8c2

packages/sandbox/src/sandbox.ts

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,24 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
614614
}
615615

616616

617-
// Streaming methods - return ReadableStream for RPC compatibility
617+
/**
618+
* Execute a command and return a ReadableStream of SSE events.
619+
*
620+
* ⚠️ **Important**: If calling this method from outside the Sandbox Durable Object (via RPC),
621+
* streams may disconnect after ~40 seconds for long-running commands. For RPC calls,
622+
* use `execStreamWithCallback()` instead which processes the stream internally.
623+
*
624+
* **Use this method when:**
625+
* - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response
626+
* - You're calling this directly from within the Sandbox DO
627+
*
628+
* **Use `execStreamWithCallback()` when:**
629+
* - Calling from another Durable Object via RPC
630+
* - Commands may run longer than 40 seconds
631+
* - You need reliable event delivery
632+
*
633+
* @see execStreamWithCallback for RPC-safe streaming
634+
*/
618635
async execStream(command: string, options?: StreamOptions): Promise<ReadableStream<Uint8Array>> {
619636
// Check for cancellation
620637
if (options?.signal?.aborted) {
@@ -640,6 +657,24 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
640657
return this.wrapStreamWithActivityRenewal(stream);
641658
}
642659

660+
/**
661+
* Stream logs from a background process as a ReadableStream.
662+
*
663+
* ⚠️ **Important**: If calling this method from outside the Sandbox Durable Object (via RPC),
664+
* streams may disconnect after ~40 seconds for long-running processes. For RPC calls,
665+
* use `streamProcessLogsWithCallback()` instead which processes the stream internally.
666+
*
667+
* **Use this method when:**
668+
* - You're inside a Worker endpoint and need to proxy/pipe the stream to an HTTP response
669+
* - You're calling this directly from within the Sandbox DO
670+
*
671+
* **Use `streamProcessLogsWithCallback()` when:**
672+
* - Calling from another Durable Object via RPC
673+
* - Process may run longer than 40 seconds
674+
* - You need reliable event delivery
675+
*
676+
* @see streamProcessLogsWithCallback for RPC-safe streaming
677+
*/
643678
async streamProcessLogs(processId: string, options?: { signal?: AbortSignal }): Promise<ReadableStream<Uint8Array>> {
644679
// Check for cancellation
645680
if (options?.signal?.aborted) {
@@ -806,6 +841,94 @@ export class Sandbox<Env = unknown> extends Container<Env> implements ISandbox {
806841
});
807842
}
808843

844+
/**
845+
* Execute a command with streaming output handled internally via callback.
846+
*
847+
* @param command - The command to execute
848+
* @param onEvent - Callback function that receives each ExecEvent as it arrives
849+
* @param options - Optional execution options including sessionId and signal
850+
* @returns Promise that resolves when the command completes
851+
*/
852+
async execStreamWithCallback(
853+
command: string,
854+
onEvent: (event: ExecEvent) => void | Promise<void>,
855+
options?: { sessionId?: string; signal?: AbortSignal }
856+
): Promise<void> {
857+
// Check for cancellation
858+
if (options?.signal?.aborted) {
859+
throw new Error('Operation was aborted');
860+
}
861+
862+
const session = options?.sessionId ?? await this.ensureDefaultSession();
863+
864+
// Get the stream - this happens INSIDE the Sandbox DO, so it's a direct HTTP fetch
865+
const stream = await this.client.commands.executeStream(command, session);
866+
867+
// Parse and process the stream internally
868+
try {
869+
for await (const event of parseSSEStream<ExecEvent>(stream)) {
870+
// Check for cancellation during streaming
871+
if (options?.signal?.aborted) {
872+
throw new Error('Operation was aborted');
873+
}
874+
875+
// Renew activity timeout periodically (if available)
876+
if (this.renewActivityTimeout) {
877+
this.renewActivityTimeout();
878+
}
879+
880+
// Call the event callback
881+
await onEvent(event);
882+
}
883+
} catch (error) {
884+
this.logger.error('Error in execStreamWithCallback', error instanceof Error ? error : new Error(String(error)));
885+
throw error;
886+
}
887+
}
888+
889+
/**
890+
* Stream process logs with output handled internally via callback.
891+
*
892+
* @param processId - The ID of the process to stream logs from
893+
* @param onEvent - Callback function that receives each ExecEvent as it arrives
894+
* @param options - Optional signal for cancellation
895+
* @returns Promise that resolves when the stream completes
896+
*/
897+
async streamProcessLogsWithCallback(
898+
processId: string,
899+
onEvent: (event: ExecEvent) => void | Promise<void>,
900+
options?: { signal?: AbortSignal }
901+
): Promise<void> {
902+
// Check for cancellation
903+
if (options?.signal?.aborted) {
904+
throw new Error('Operation was aborted');
905+
}
906+
907+
// Get the stream - this happens INSIDE the Sandbox DO, so it's a direct HTTP fetch
908+
const stream = await this.client.processes.streamProcessLogs(processId);
909+
910+
// Parse and process the stream internally
911+
try {
912+
for await (const event of parseSSEStream<ExecEvent>(stream)) {
913+
// Check for cancellation during streaming
914+
if (options?.signal?.aborted) {
915+
throw new Error('Operation was aborted');
916+
}
917+
918+
// Renew activity timeout periodically (if available)
919+
if (this.renewActivityTimeout) {
920+
this.renewActivityTimeout();
921+
}
922+
923+
// Call the event callback
924+
await onEvent(event);
925+
}
926+
} catch (error) {
927+
this.logger.error('Error in streamProcessLogsWithCallback', error instanceof Error ? error : new Error(String(error)));
928+
throw error;
929+
}
930+
}
931+
809932
async gitCheckout(
810933
repoUrl: string,
811934
options: { branch?: string; targetDir?: string; sessionId?: string }

0 commit comments

Comments
 (0)