From 0091e965d4939611a9e3ac89e964c07d4f4f3049 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 11 Dec 2025 12:26:31 +0000 Subject: [PATCH 1/7] testing circuit breaker --- packages/sandbox/src/clients/base-client.ts | 65 ++- .../sandbox/src/clients/circuit-breaker.ts | 238 +++++++++ packages/sandbox/src/clients/index.ts | 15 +- packages/sandbox/src/clients/request-queue.ts | 244 +++++++++ .../sandbox/src/clients/sandbox-client.ts | 67 ++- packages/sandbox/src/clients/types.ts | 38 ++ packages/sandbox/src/index.ts | 10 + .../sandbox/tests/circuit-breaker.test.ts | 273 ++++++++++ packages/sandbox/tests/request-queue.test.ts | 503 ++++++++++++++++++ 9 files changed, 1444 insertions(+), 9 deletions(-) create mode 100644 packages/sandbox/src/clients/circuit-breaker.ts create mode 100644 packages/sandbox/src/clients/request-queue.ts create mode 100644 packages/sandbox/tests/circuit-breaker.test.ts create mode 100644 packages/sandbox/tests/request-queue.test.ts diff --git a/packages/sandbox/src/clients/base-client.ts b/packages/sandbox/src/clients/base-client.ts index d9e0242c..93a85086 100644 --- a/packages/sandbox/src/clients/base-client.ts +++ b/packages/sandbox/src/clients/base-client.ts @@ -1,9 +1,10 @@ import type { Logger } from '@repo/shared'; import { createNoOpLogger } from '@repo/shared'; -import { getHttpStatus } from '@repo/shared/errors'; import type { ErrorResponse as NewErrorResponse } from '../errors'; import { createErrorFromResponse, ErrorCode } from '../errors'; import type { SandboxError } from '../errors/classes'; +import type { CircuitBreaker } from './circuit-breaker'; +import type { RequestQueue } from './request-queue'; import type { HttpClientOptions, ResponseHandler } from './types'; // Container startup retry configuration @@ -17,26 +18,70 @@ export abstract class BaseHttpClient { protected baseUrl: string; protected options: HttpClientOptions; protected logger: Logger; + protected circuitBreaker?: CircuitBreaker; + protected requestQueue?: RequestQueue; constructor(options: HttpClientOptions = {}) { this.options = options; this.logger = options.logger ?? createNoOpLogger(); this.baseUrl = this.options.baseUrl!; + + // Use injected instances (shared across all clients in SandboxClient) + this.circuitBreaker = options._circuitBreaker; + this.requestQueue = options._requestQueue; } /** - * Core HTTP request method with automatic retry for container startup delays - * Retries both 503 (provisioning) and 500 (startup failure) errors when they're container-related + * Core HTTP request method with resilience features + * + * Request flow: + * 1. Circuit breaker check - fail fast if circuit is open + * 2. Request queue - wait if at concurrency limit + * 3. Execute fetch with container startup retry logic + * 4. Record success/failure for circuit breaker */ protected async doFetch( path: string, options?: RequestInit + ): Promise { + // Step 1: Check circuit breaker (fail fast if open) + if (this.circuitBreaker && !this.circuitBreaker.canExecute()) { + const stats = this.circuitBreaker.getStats(); + const { CircuitOpenError } = await import('./circuit-breaker'); + throw new CircuitOpenError(stats.remainingRecoveryMs ?? 0); + } + + // Step 2: Execute through request queue if available + const executeRequest = () => this.doFetchWithRetry(path, options); + + if (this.requestQueue) { + return this.requestQueue.execute(executeRequest); + } + + return executeRequest(); + } + + /** + * Internal fetch with container startup retry logic + * Retries both 503 (provisioning) and 500 (startup failure) errors when they're container-related + */ + private async doFetchWithRetry( + path: string, + options?: RequestInit ): Promise { const startTime = Date.now(); let attempt = 0; while (true) { - const response = await this.executeFetch(path, options); + let response: Response; + + try { + response = await this.executeFetch(path, options); + } catch (error) { + // Network error - record as failure for circuit breaker + this.circuitBreaker?.recordFailure(); + throw error; + } // Check if this is a retryable container error (both 500 and 503) const shouldRetry = await this.isRetryableContainerError(response); @@ -62,17 +107,25 @@ export abstract class BaseHttpClient { continue; } - // Timeout exhausted + // Timeout exhausted - record as failure this.logger.error( 'Container failed to become ready', new Error( `Failed after ${attempt + 1} attempts over ${Math.floor(elapsed / 1000)}s` ) ); + this.circuitBreaker?.recordFailure(); return response; } - // Not a retryable error or request succeeded + // Record success/failure for circuit breaker based on response status + if (response.ok) { + this.circuitBreaker?.recordSuccess(); + } else if (response.status >= 500) { + // Only count server errors as failures for circuit breaker + this.circuitBreaker?.recordFailure(); + } + return response; } } diff --git a/packages/sandbox/src/clients/circuit-breaker.ts b/packages/sandbox/src/clients/circuit-breaker.ts new file mode 100644 index 00000000..c5b6721c --- /dev/null +++ b/packages/sandbox/src/clients/circuit-breaker.ts @@ -0,0 +1,238 @@ +/** + * Circuit breaker states + */ +export type CircuitState = 'closed' | 'open' | 'half-open'; + +/** + * Configuration options for the circuit breaker + */ +export interface CircuitBreakerOptions { + /** + * Number of failures in the window before opening the circuit + * @default 5 + */ + failureThreshold?: number; + + /** + * Time window in milliseconds for counting failures + * @default 30000 (30 seconds) + */ + failureWindow?: number; + + /** + * Time in milliseconds to wait before attempting recovery (half-open state) + * @default 10000 (10 seconds) + */ + recoveryTimeout?: number; + + /** + * Number of successful requests needed in half-open state to close the circuit + * @default 2 + */ + successThreshold?: number; + + /** + * Callback when circuit state changes + */ + onStateChange?: (state: CircuitState, previousState: CircuitState) => void; +} + +/** + * Error thrown when circuit is open and requests are rejected + */ +export class CircuitOpenError extends Error { + readonly name = 'CircuitOpenError'; + readonly remainingMs: number; + + constructor(remainingMs: number) { + super( + `Circuit breaker is open. Service is unavailable. Retry after ${Math.ceil(remainingMs / 1000)}s.` + ); + this.remainingMs = remainingMs; + } +} + +/** + * Circuit breaker implementation to protect against cascading failures + * + * States: + * - CLOSED: Normal operation, requests pass through + * - OPEN: Failures exceeded threshold, requests fail fast + * - HALF-OPEN: Testing if service recovered, limited requests allowed + * + * The circuit breaker tracks failures in a sliding window. When failures + * exceed the threshold, it opens and rejects requests immediately. After + * a recovery timeout, it enters half-open state to test if the service + * has recovered. + */ +export class CircuitBreaker { + private state: CircuitState = 'closed'; + private failures: number[] = []; // Timestamps of recent failures + private successCount = 0; // Successes in half-open state + private openedAt = 0; // When circuit opened + private readonly options: Required< + Omit + > & { + onStateChange?: CircuitBreakerOptions['onStateChange']; + }; + + constructor(options: CircuitBreakerOptions = {}) { + this.options = { + failureThreshold: options.failureThreshold ?? 5, + failureWindow: options.failureWindow ?? 30_000, + recoveryTimeout: options.recoveryTimeout ?? 10_000, + successThreshold: options.successThreshold ?? 2, + onStateChange: options.onStateChange + }; + } + + /** + * Get current circuit state + */ + getState(): CircuitState { + return this.state; + } + + /** + * Check if a request should be allowed through + * @returns true if request can proceed, false if circuit is open + * @throws CircuitOpenError if circuit is open (prefer using canExecute for non-throwing check) + */ + canExecute(): boolean { + this.cleanupOldFailures(); + + switch (this.state) { + case 'closed': + return true; + + case 'open': { + const elapsed = Date.now() - this.openedAt; + if (elapsed >= this.options.recoveryTimeout) { + // Transition to half-open + this.setState('half-open'); + return true; + } + return false; + } + + case 'half-open': + // Allow limited requests through for testing + return true; + } + } + + /** + * Execute a request through the circuit breaker + * Tracks success/failure and manages state transitions + */ + async execute(fn: () => Promise): Promise { + if (!this.canExecute()) { + const remaining = + this.options.recoveryTimeout - (Date.now() - this.openedAt); + throw new CircuitOpenError(remaining); + } + + try { + const result = await fn(); + this.recordSuccess(); + return result; + } catch (error) { + this.recordFailure(); + throw error; + } + } + + /** + * Record a successful request + */ + recordSuccess(): void { + if (this.state === 'half-open') { + this.successCount++; + if (this.successCount >= this.options.successThreshold) { + // Service has recovered + this.setState('closed'); + this.failures = []; + this.successCount = 0; + } + } + } + + /** + * Record a failed request + * Only counts failures for 5xx errors (server errors) + */ + recordFailure(): void { + const now = Date.now(); + this.failures.push(now); + this.cleanupOldFailures(); + + if (this.state === 'half-open') { + // Any failure in half-open immediately reopens + this.setState('open'); + this.openedAt = now; + this.successCount = 0; + } else if (this.state === 'closed') { + if (this.failures.length >= this.options.failureThreshold) { + this.setState('open'); + this.openedAt = now; + } + } + } + + /** + * Reset the circuit breaker to closed state + * Useful for manual intervention or testing + */ + reset(): void { + this.setState('closed'); + this.failures = []; + this.successCount = 0; + this.openedAt = 0; + } + + /** + * Get statistics about the circuit breaker + */ + getStats(): { + state: CircuitState; + failureCount: number; + successCount: number; + remainingRecoveryMs: number | null; + } { + this.cleanupOldFailures(); + + let remainingRecoveryMs: number | null = null; + if (this.state === 'open') { + remainingRecoveryMs = Math.max( + 0, + this.options.recoveryTimeout - (Date.now() - this.openedAt) + ); + } + + return { + state: this.state, + failureCount: this.failures.length, + successCount: this.successCount, + remainingRecoveryMs + }; + } + + /** + * Remove failures outside the sliding window + */ + private cleanupOldFailures(): void { + const cutoff = Date.now() - this.options.failureWindow; + this.failures = this.failures.filter((timestamp) => timestamp > cutoff); + } + + /** + * Set state and notify listener + */ + private setState(newState: CircuitState): void { + if (this.state !== newState) { + const previousState = this.state; + this.state = newState; + this.options.onStateChange?.(newState, previousState); + } + } +} diff --git a/packages/sandbox/src/clients/index.ts b/packages/sandbox/src/clients/index.ts index 840ef1ce..72231c43 100644 --- a/packages/sandbox/src/clients/index.ts +++ b/packages/sandbox/src/clients/index.ts @@ -1,8 +1,14 @@ // Main client exports +// Resilience exports +export { + CircuitBreaker, + type CircuitBreakerOptions, + CircuitOpenError, + type CircuitState +} from './circuit-breaker'; // Command client types export type { ExecuteRequest, ExecuteResponse } from './command-client'; - // Domain-specific clients export { CommandClient } from './command-client'; // File client types @@ -40,6 +46,12 @@ export type { StartProcessRequest } from './process-client'; export { ProcessClient } from './process-client'; +export { + QueueFullError, + QueueTimeoutError, + RequestQueue, + type RequestQueueOptions +} from './request-queue'; export { SandboxClient } from './sandbox-client'; // Types and interfaces export type { @@ -48,6 +60,7 @@ export type { ErrorResponse, HttpClientOptions, RequestConfig, + ResilienceOptions, ResponseHandler, SessionRequest } from './types'; diff --git a/packages/sandbox/src/clients/request-queue.ts b/packages/sandbox/src/clients/request-queue.ts new file mode 100644 index 00000000..11c57dc2 --- /dev/null +++ b/packages/sandbox/src/clients/request-queue.ts @@ -0,0 +1,244 @@ +/** + * Configuration options for the request queue + */ +export interface RequestQueueOptions { + /** + * Maximum number of concurrent requests allowed + * @default 10 + */ + maxConcurrent?: number; + + /** + * Maximum number of requests waiting in queue + * When exceeded, oldest requests are rejected + * @default 100 + */ + maxQueueSize?: number; + + /** + * Timeout for requests waiting in queue (milliseconds) + * @default 30000 (30 seconds) + */ + queueTimeout?: number; + + /** + * Callback when a request is queued + */ + onQueued?: (queueLength: number) => void; + + /** + * Callback when a request is dequeued + */ + onDequeued?: (waitTime: number) => void; +} + +/** + * Error thrown when queue is full and request is rejected + */ +export class QueueFullError extends Error { + readonly name = 'QueueFullError'; + readonly queueSize: number; + + constructor(queueSize: number) { + super( + `Request queue is full (${queueSize} pending requests). Service is overloaded.` + ); + this.queueSize = queueSize; + } +} + +/** + * Error thrown when request times out waiting in queue + */ +export class QueueTimeoutError extends Error { + readonly name = 'QueueTimeoutError'; + readonly waitTime: number; + + constructor(waitTime: number) { + super( + `Request timed out after ${Math.ceil(waitTime / 1000)}s waiting in queue.` + ); + this.waitTime = waitTime; + } +} + +/** + * Queued request with its resolver + */ +interface QueuedRequest { + execute: () => Promise; + resolve: (value: T) => void; + reject: (error: unknown) => void; + queuedAt: number; + timeoutId?: ReturnType; +} + +/** + * Request queue with concurrency limiting + * + * Provides backpressure by limiting the number of concurrent requests + * and queuing excess requests. This smooths out traffic bursts and + * prevents overwhelming downstream services. + * + * Features: + * - Configurable concurrency limit + * - Queue size limit to prevent memory exhaustion + * - Timeout for queued requests + * - FIFO ordering for fairness + */ +export class RequestQueue { + private readonly options: Required< + Omit + > & { + onQueued?: RequestQueueOptions['onQueued']; + onDequeued?: RequestQueueOptions['onDequeued']; + }; + private activeCount = 0; + private readonly queue: QueuedRequest[] = []; + + constructor(options: RequestQueueOptions = {}) { + this.options = { + maxConcurrent: options.maxConcurrent ?? 10, + maxQueueSize: options.maxQueueSize ?? 100, + queueTimeout: options.queueTimeout ?? 30_000, + onQueued: options.onQueued, + onDequeued: options.onDequeued + }; + } + + /** + * Execute a request through the queue + * Returns immediately if under concurrency limit, otherwise queues + */ + async execute(fn: () => Promise): Promise { + // Fast path: under concurrency limit + if (this.activeCount < this.options.maxConcurrent) { + return this.executeNow(fn); + } + + // Need to queue + return this.enqueue(fn); + } + + /** + * Execute immediately, tracking active count + */ + private async executeNow(fn: () => Promise): Promise { + this.activeCount++; + + try { + return await fn(); + } finally { + this.activeCount--; + this.processQueue(); + } + } + + /** + * Add request to queue + */ + private enqueue(fn: () => Promise): Promise { + // Check queue size limit + if (this.queue.length >= this.options.maxQueueSize) { + throw new QueueFullError(this.queue.length); + } + + return new Promise((resolve, reject) => { + const queuedAt = Date.now(); + + const request: QueuedRequest = { + execute: fn, + resolve: resolve as (value: unknown) => void, + reject, + queuedAt + }; + + // Set up timeout + request.timeoutId = setTimeout(() => { + // Remove from queue + const index = this.queue.indexOf(request as QueuedRequest); + if (index !== -1) { + this.queue.splice(index, 1); + reject(new QueueTimeoutError(Date.now() - queuedAt)); + } + }, this.options.queueTimeout); + + this.queue.push(request as QueuedRequest); + this.options.onQueued?.(this.queue.length); + }); + } + + /** + * Process next request from queue if capacity available + */ + private processQueue(): void { + if (this.queue.length === 0) { + return; + } + + if (this.activeCount >= this.options.maxConcurrent) { + return; + } + + const request = this.queue.shift(); + if (!request) { + return; + } + + // Clear timeout + if (request.timeoutId) { + clearTimeout(request.timeoutId); + } + + const waitTime = Date.now() - request.queuedAt; + this.options.onDequeued?.(waitTime); + + // Execute the request + this.executeNow(request.execute) + .then(request.resolve) + .catch(request.reject); + } + + /** + * Get current queue statistics + */ + getStats(): { + activeCount: number; + queueLength: number; + maxConcurrent: number; + maxQueueSize: number; + } { + return { + activeCount: this.activeCount, + queueLength: this.queue.length, + maxConcurrent: this.options.maxConcurrent, + maxQueueSize: this.options.maxQueueSize + }; + } + + /** + * Check if queue has capacity for new requests + */ + hasCapacity(): boolean { + return ( + this.activeCount < this.options.maxConcurrent || + this.queue.length < this.options.maxQueueSize + ); + } + + /** + * Clear the queue, rejecting all pending requests + * @param reason - Error message for rejected requests + */ + clear(reason = 'Queue cleared'): void { + while (this.queue.length > 0) { + const request = this.queue.shift(); + if (request) { + if (request.timeoutId) { + clearTimeout(request.timeoutId); + } + request.reject(new Error(reason)); + } + } + } +} diff --git a/packages/sandbox/src/clients/sandbox-client.ts b/packages/sandbox/src/clients/sandbox-client.ts index fa1599eb..1387a5e2 100644 --- a/packages/sandbox/src/clients/sandbox-client.ts +++ b/packages/sandbox/src/clients/sandbox-client.ts @@ -1,15 +1,20 @@ +import { CircuitBreaker } from './circuit-breaker'; import { CommandClient } from './command-client'; import { FileClient } from './file-client'; import { GitClient } from './git-client'; import { InterpreterClient } from './interpreter-client'; import { PortClient } from './port-client'; import { ProcessClient } from './process-client'; +import { RequestQueue } from './request-queue'; import type { HttpClientOptions } from './types'; import { UtilityClient } from './utility-client'; /** * Main sandbox client that composes all domain-specific clients * Provides organized access to all sandbox functionality + * + * Resilience features (circuit breaker, request queue) are shared across + * all domain clients to provide coordinated protection against overload. */ export class SandboxClient { public readonly commands: CommandClient; @@ -20,11 +25,40 @@ export class SandboxClient { public readonly interpreter: InterpreterClient; public readonly utils: UtilityClient; + /** Shared circuit breaker instance */ + public readonly circuitBreaker?: CircuitBreaker; + + /** Shared request queue instance */ + public readonly requestQueue?: RequestQueue; + constructor(options: HttpClientOptions) { - // Ensure baseUrl is provided for all clients + // Create shared resilience instances based on configuration + const resilience = options.resilience; + + // Create circuit breaker unless explicitly disabled + if (resilience?.circuitBreaker !== false) { + this.circuitBreaker = new CircuitBreaker( + typeof resilience?.circuitBreaker === 'object' + ? resilience.circuitBreaker + : undefined + ); + } + + // Create request queue unless explicitly disabled + if (resilience?.requestQueue !== false) { + this.requestQueue = new RequestQueue( + typeof resilience?.requestQueue === 'object' + ? resilience.requestQueue + : undefined + ); + } + + // Build client options with shared instances const clientOptions: HttpClientOptions = { baseUrl: 'http://localhost:3000', - ...options + ...options, + _circuitBreaker: this.circuitBreaker, + _requestQueue: this.requestQueue }; // Initialize all domain clients with shared options @@ -36,4 +70,33 @@ export class SandboxClient { this.interpreter = new InterpreterClient(clientOptions); this.utils = new UtilityClient(clientOptions); } + + /** + * Get current resilience statistics + */ + getResilienceStats(): { + circuitBreaker?: ReturnType; + requestQueue?: ReturnType; + } { + return { + circuitBreaker: this.circuitBreaker?.getStats(), + requestQueue: this.requestQueue?.getStats() + }; + } + + /** + * Reset circuit breaker to closed state + * Useful for manual intervention after fixing issues + */ + resetCircuitBreaker(): void { + this.circuitBreaker?.reset(); + } + + /** + * Clear the request queue, rejecting all pending requests + * @param reason - Error message for rejected requests + */ + clearRequestQueue(reason?: string): void { + this.requestQueue?.clear(reason); + } } diff --git a/packages/sandbox/src/clients/types.ts b/packages/sandbox/src/clients/types.ts index b59d8af3..c4566a18 100644 --- a/packages/sandbox/src/clients/types.ts +++ b/packages/sandbox/src/clients/types.ts @@ -1,4 +1,6 @@ import type { Logger } from '@repo/shared'; +import type { CircuitBreaker, CircuitBreakerOptions } from './circuit-breaker'; +import type { RequestQueue, RequestQueueOptions } from './request-queue'; /** * Minimal interface for container fetch functionality @@ -11,6 +13,24 @@ export interface ContainerStub { ): Promise; } +/** + * Resilience configuration for the HTTP client + * Controls circuit breaker and request queue behavior + */ +export interface ResilienceOptions { + /** + * Circuit breaker configuration + * Set to false to disable circuit breaker entirely + */ + circuitBreaker?: CircuitBreakerOptions | false; + + /** + * Request queue configuration + * Set to false to disable request queuing entirely + */ + requestQueue?: RequestQueueOptions | false; +} + /** * Shared HTTP client configuration options */ @@ -27,6 +47,24 @@ export interface HttpClientOptions { command: string ) => void; onError?: (error: string, command?: string) => void; + + /** + * Resilience configuration (circuit breaker, request queue) + * These components are shared across all domain clients + */ + resilience?: ResilienceOptions; + + /** + * Shared circuit breaker instance (injected by SandboxClient) + * @internal + */ + _circuitBreaker?: CircuitBreaker; + + /** + * Shared request queue instance (injected by SandboxClient) + * @internal + */ + _requestQueue?: RequestQueue; } /** diff --git a/packages/sandbox/src/index.ts b/packages/sandbox/src/index.ts index ea964d0e..e663dcf4 100644 --- a/packages/sandbox/src/index.ts +++ b/packages/sandbox/src/index.ts @@ -1,12 +1,22 @@ // Export the main Sandbox class and utilities // Export the new client architecture +// Export resilience components for advanced usage export { + CircuitBreaker, + type CircuitBreakerOptions, + CircuitOpenError, + type CircuitState, CommandClient, FileClient, GitClient, PortClient, ProcessClient, + QueueFullError, + QueueTimeoutError, + RequestQueue, + type RequestQueueOptions, + type ResilienceOptions, SandboxClient, UtilityClient } from './clients'; diff --git a/packages/sandbox/tests/circuit-breaker.test.ts b/packages/sandbox/tests/circuit-breaker.test.ts new file mode 100644 index 00000000..988402e0 --- /dev/null +++ b/packages/sandbox/tests/circuit-breaker.test.ts @@ -0,0 +1,273 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + CircuitBreaker, + CircuitOpenError, + type CircuitState +} from '../src/clients/circuit-breaker'; + +describe('CircuitBreaker', () => { + let circuitBreaker: CircuitBreaker; + + beforeEach(() => { + vi.useFakeTimers(); + circuitBreaker = new CircuitBreaker({ + failureThreshold: 3, + failureWindow: 10_000, // 10 seconds + recoveryTimeout: 5_000, // 5 seconds + successThreshold: 2 + }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe('initial state', () => { + it('should start in closed state', () => { + expect(circuitBreaker.getState()).toBe('closed'); + }); + + it('should allow requests when closed', () => { + expect(circuitBreaker.canExecute()).toBe(true); + }); + + it('should have correct initial stats', () => { + const stats = circuitBreaker.getStats(); + expect(stats.state).toBe('closed'); + expect(stats.failureCount).toBe(0); + expect(stats.successCount).toBe(0); + expect(stats.remainingRecoveryMs).toBeNull(); + }); + }); + + describe('failure tracking', () => { + it('should track failures', () => { + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + const stats = circuitBreaker.getStats(); + expect(stats.failureCount).toBe(2); + expect(stats.state).toBe('closed'); // Still below threshold + }); + + it('should open circuit when failure threshold is reached', () => { + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + expect(circuitBreaker.getState()).toBe('open'); + expect(circuitBreaker.canExecute()).toBe(false); + }); + + it('should clear old failures outside the window', () => { + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + // Advance past the failure window + vi.advanceTimersByTime(11_000); + + const stats = circuitBreaker.getStats(); + expect(stats.failureCount).toBe(0); + }); + + it('should not open circuit if failures are spread across windows', () => { + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + vi.advanceTimersByTime(11_000); // Past first failures + + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + // Only 2 failures in current window, below threshold + expect(circuitBreaker.getState()).toBe('closed'); + }); + }); + + describe('state transitions', () => { + it('should transition to half-open after recovery timeout', () => { + // Open the circuit + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + expect(circuitBreaker.getState()).toBe('open'); + + // Advance past recovery timeout + vi.advanceTimersByTime(6_000); + + // Check canExecute triggers transition to half-open + expect(circuitBreaker.canExecute()).toBe(true); + expect(circuitBreaker.getState()).toBe('half-open'); + }); + + it('should close circuit after success threshold in half-open', () => { + // Open the circuit + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + // Wait for recovery + vi.advanceTimersByTime(6_000); + circuitBreaker.canExecute(); // Triggers half-open + + // Record successes + circuitBreaker.recordSuccess(); + expect(circuitBreaker.getState()).toBe('half-open'); + + circuitBreaker.recordSuccess(); + expect(circuitBreaker.getState()).toBe('closed'); + }); + + it('should reopen circuit on failure in half-open state', () => { + // Open the circuit + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + // Wait for recovery + vi.advanceTimersByTime(6_000); + circuitBreaker.canExecute(); // Triggers half-open + + // Record a failure + circuitBreaker.recordFailure(); + + expect(circuitBreaker.getState()).toBe('open'); + }); + + it('should notify on state changes', () => { + const transitions: Array<{ from: CircuitState; to: CircuitState }> = []; + + circuitBreaker = new CircuitBreaker({ + failureThreshold: 2, + recoveryTimeout: 5_000, + successThreshold: 1, + onStateChange: (to, from) => { + transitions.push({ from, to }); + } + }); + + // Trigger closed -> open + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + expect(transitions).toHaveLength(1); + expect(transitions[0]).toEqual({ from: 'closed', to: 'open' }); + + // Trigger open -> half-open + vi.advanceTimersByTime(6_000); + circuitBreaker.canExecute(); + + expect(transitions).toHaveLength(2); + expect(transitions[1]).toEqual({ from: 'open', to: 'half-open' }); + + // Trigger half-open -> closed + circuitBreaker.recordSuccess(); + + expect(transitions).toHaveLength(3); + expect(transitions[2]).toEqual({ from: 'half-open', to: 'closed' }); + }); + }); + + describe('execute method', () => { + it('should execute function when circuit is closed', async () => { + const fn = vi.fn().mockResolvedValue('result'); + + const result = await circuitBreaker.execute(fn); + + expect(result).toBe('result'); + expect(fn).toHaveBeenCalled(); + }); + + it('should record success on successful execution', async () => { + const fn = vi.fn().mockResolvedValue('result'); + + await circuitBreaker.execute(fn); + + // Open circuit first to test half-open behavior + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + vi.advanceTimersByTime(6_000); + circuitBreaker.canExecute(); // half-open + + await circuitBreaker.execute(fn); + await circuitBreaker.execute(fn); + + expect(circuitBreaker.getState()).toBe('closed'); + }); + + it('should record failure on failed execution', async () => { + const fn = vi.fn().mockRejectedValue(new Error('fail')); + + await expect(circuitBreaker.execute(fn)).rejects.toThrow('fail'); + await expect(circuitBreaker.execute(fn)).rejects.toThrow('fail'); + await expect(circuitBreaker.execute(fn)).rejects.toThrow('fail'); + + expect(circuitBreaker.getState()).toBe('open'); + }); + + it('should throw CircuitOpenError when circuit is open', async () => { + // Open the circuit + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + const fn = vi.fn().mockResolvedValue('result'); + + await expect(circuitBreaker.execute(fn)).rejects.toThrow( + CircuitOpenError + ); + expect(fn).not.toHaveBeenCalled(); + }); + + it('should include remaining recovery time in CircuitOpenError', async () => { + // Open the circuit + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + // Advance partway through recovery + vi.advanceTimersByTime(2_000); + + const fn = vi.fn().mockResolvedValue('result'); + + try { + await circuitBreaker.execute(fn); + } catch (error) { + expect(error).toBeInstanceOf(CircuitOpenError); + expect((error as CircuitOpenError).remainingMs).toBeCloseTo(3_000, -2); + } + }); + }); + + describe('reset', () => { + it('should reset circuit to closed state', () => { + // Open the circuit + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + circuitBreaker.recordFailure(); + + expect(circuitBreaker.getState()).toBe('open'); + + circuitBreaker.reset(); + + expect(circuitBreaker.getState()).toBe('closed'); + expect(circuitBreaker.getStats().failureCount).toBe(0); + }); + }); + + describe('default options', () => { + it('should use sensible defaults', () => { + const defaultBreaker = new CircuitBreaker(); + + // Record 5 failures (default threshold) + for (let i = 0; i < 5; i++) { + defaultBreaker.recordFailure(); + } + + expect(defaultBreaker.getState()).toBe('open'); + }); + }); +}); diff --git a/packages/sandbox/tests/request-queue.test.ts b/packages/sandbox/tests/request-queue.test.ts new file mode 100644 index 00000000..65b3aa38 --- /dev/null +++ b/packages/sandbox/tests/request-queue.test.ts @@ -0,0 +1,503 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + QueueFullError, + QueueTimeoutError, + RequestQueue +} from '../src/clients/request-queue'; + +describe('RequestQueue', () => { + let queue: RequestQueue; + + beforeEach(() => { + vi.useFakeTimers(); + queue = new RequestQueue({ + maxConcurrent: 2, + maxQueueSize: 3, + queueTimeout: 5_000 + }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe('initial state', () => { + it('should have correct initial stats', () => { + const stats = queue.getStats(); + expect(stats.activeCount).toBe(0); + expect(stats.queueLength).toBe(0); + expect(stats.maxConcurrent).toBe(2); + expect(stats.maxQueueSize).toBe(3); + }); + + it('should have capacity when empty', () => { + expect(queue.hasCapacity()).toBe(true); + }); + }); + + describe('basic execution', () => { + it('should execute requests immediately when under concurrency limit', async () => { + const fn = vi.fn().mockResolvedValue('result'); + + const result = await queue.execute(fn); + + expect(result).toBe('result'); + expect(fn).toHaveBeenCalled(); + }); + + it('should track active count during execution', async () => { + let activeCountDuringExecution = -1; + + const fn = vi.fn().mockImplementation(async () => { + activeCountDuringExecution = queue.getStats().activeCount; + return 'result'; + }); + + await queue.execute(fn); + + expect(activeCountDuringExecution).toBe(1); + expect(queue.getStats().activeCount).toBe(0); + }); + + it('should allow concurrent requests up to limit', async () => { + const results: string[] = []; + const resolvers: Array<(value: string) => void> = []; + + // Start 2 concurrent requests (at limit) + const request1 = queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + const request2 = queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + + // Both should be executing + expect(queue.getStats().activeCount).toBe(2); + expect(queue.getStats().queueLength).toBe(0); + + // Resolve both + resolvers[0]('result1'); + resolvers[1]('result2'); + + results.push(await request1); + results.push(await request2); + + expect(results).toEqual(['result1', 'result2']); + }); + }); + + describe('queueing behavior', () => { + it('should queue requests when at concurrency limit', async () => { + const resolvers: Array<(value: string) => void> = []; + + // Fill up concurrent slots + queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + + // This should be queued + const queued = queue.execute(() => Promise.resolve('queued')); + + expect(queue.getStats().activeCount).toBe(2); + expect(queue.getStats().queueLength).toBe(1); + + // Resolve one active request + resolvers[0]('first'); + + // Allow queued request to process + await vi.advanceTimersByTimeAsync(0); + + const result = await queued; + expect(result).toBe('queued'); + }); + + it('should process queue in FIFO order', async () => { + const results: number[] = []; + const resolvers: Array<(value: number) => void> = []; + + // Fill concurrent slots + const active1 = queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + const active2 = queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + + // Queue additional requests + const queued1 = queue.execute(async () => { + results.push(1); + return 1; + }); + const queued2 = queue.execute(async () => { + results.push(2); + return 2; + }); + const queued3 = queue.execute(async () => { + results.push(3); + return 3; + }); + + // Resolve active requests + resolvers[0](0); + resolvers[1](0); + + await active1; + await active2; + await vi.advanceTimersByTimeAsync(0); + await queued1; + await vi.advanceTimersByTimeAsync(0); + await queued2; + await vi.advanceTimersByTimeAsync(0); + await queued3; + + expect(results).toEqual([1, 2, 3]); + }); + + it('should notify when requests are queued', async () => { + const onQueued = vi.fn(); + queue = new RequestQueue({ + maxConcurrent: 1, + maxQueueSize: 5, + onQueued + }); + + const resolver: { resolve?: () => void } = {}; + + // Fill concurrent slot + queue.execute( + () => + new Promise((resolve) => { + resolver.resolve = resolve; + }) + ); + + // Queue a request + queue.execute(() => Promise.resolve()); + + expect(onQueued).toHaveBeenCalledWith(1); + + // Cleanup + resolver.resolve?.(); + }); + + it('should notify when requests are dequeued with wait time', async () => { + const onDequeued = vi.fn(); + queue = new RequestQueue({ + maxConcurrent: 1, + maxQueueSize: 5, + queueTimeout: 30_000, + onDequeued + }); + + const resolver: { resolve?: () => void } = {}; + + // Fill concurrent slot + queue.execute( + () => + new Promise((resolve) => { + resolver.resolve = resolve; + }) + ); + + // Queue a request + const queued = queue.execute(() => Promise.resolve()); + + // Wait some time + vi.advanceTimersByTime(1_000); + + // Release the active slot + resolver.resolve?.(); + + await vi.advanceTimersByTimeAsync(0); + await queued; + + expect(onDequeued).toHaveBeenCalled(); + expect(onDequeued.mock.calls[0][0]).toBeGreaterThanOrEqual(1_000); + }); + }); + + describe('queue limits', () => { + it('should throw QueueFullError when queue is full', async () => { + const resolvers: Array<() => void> = []; + + // Fill concurrent slots + queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + queue.execute( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + + // Fill queue + queue.execute(() => Promise.resolve()); + queue.execute(() => Promise.resolve()); + queue.execute(() => Promise.resolve()); + + expect(queue.getStats().queueLength).toBe(3); + + // This should throw (queue is full) + await expect(queue.execute(() => Promise.resolve())).rejects.toThrow( + QueueFullError + ); + + // Cleanup + for (const r of resolvers) { + r(); + } + }); + + it('should include queue size in QueueFullError', () => { + // Fill concurrent slots + queue.execute(() => new Promise(() => {})); + queue.execute(() => new Promise(() => {})); + + // Fill queue + queue.execute(() => Promise.resolve()); + queue.execute(() => Promise.resolve()); + queue.execute(() => Promise.resolve()); + + try { + queue.execute(() => Promise.resolve()); + } catch (error) { + expect(error).toBeInstanceOf(QueueFullError); + expect((error as QueueFullError).queueSize).toBe(3); + } + }); + }); + + describe('queue timeout', () => { + it('should timeout queued requests after queueTimeout', async () => { + const resolver: { resolve?: () => void } = {}; + + // Fill concurrent slots + queue.execute( + () => + new Promise((resolve) => { + resolver.resolve = resolve; + }) + ); + queue.execute(() => new Promise(() => {})); + + // Queue a request + const queuedPromise = queue.execute(() => Promise.resolve('result')); + + // Advance past timeout + vi.advanceTimersByTime(6_000); + + await expect(queuedPromise).rejects.toThrow(QueueTimeoutError); + }); + + it('should include wait time in QueueTimeoutError', async () => { + // Fill concurrent slots + queue.execute(() => new Promise(() => {})); + queue.execute(() => new Promise(() => {})); + + // Queue a request + const queuedPromise = queue.execute(() => Promise.resolve()); + + // Advance past timeout + vi.advanceTimersByTime(6_000); + + try { + await queuedPromise; + } catch (error) { + expect(error).toBeInstanceOf(QueueTimeoutError); + expect((error as QueueTimeoutError).waitTime).toBeGreaterThanOrEqual( + 5_000 + ); + } + }); + + it('should remove timed-out requests from queue', async () => { + // Fill concurrent slots + queue.execute(() => new Promise(() => {})); + queue.execute(() => new Promise(() => {})); + + // Queue requests with different timeouts by using a longer timeout queue + const longTimeoutQueue = new RequestQueue({ + maxConcurrent: 2, + maxQueueSize: 5, + queueTimeout: 10_000 // Longer timeout + }); + + // Fill concurrent slots + longTimeoutQueue.execute(() => new Promise(() => {})); + longTimeoutQueue.execute(() => new Promise(() => {})); + + // Queue two requests + const queued1 = longTimeoutQueue.execute(() => Promise.resolve(1)); + longTimeoutQueue.execute(() => Promise.resolve(2)); + + expect(longTimeoutQueue.getStats().queueLength).toBe(2); + + // Timeout all requests (advance past the 10s timeout) + vi.advanceTimersByTime(11_000); + + // Both should have timed out + await expect(queued1).rejects.toThrow(QueueTimeoutError); + + // Both requests should have been removed + expect(longTimeoutQueue.getStats().queueLength).toBe(0); + }); + }); + + describe('error handling', () => { + it('should propagate errors from executed functions', async () => { + const fn = vi.fn().mockRejectedValue(new Error('execution failed')); + + await expect(queue.execute(fn)).rejects.toThrow('execution failed'); + }); + + it('should decrement active count on error', async () => { + const fn = vi.fn().mockRejectedValue(new Error('fail')); + + try { + await queue.execute(fn); + } catch { + // Expected + } + + expect(queue.getStats().activeCount).toBe(0); + }); + + it('should process queue after error in active request', async () => { + let resolver: { resolve?: () => void } = {}; + + // Fill one slot with a request that will fail + const failing = queue.execute( + () => + new Promise((_, reject) => { + resolver = { resolve: () => reject(new Error('fail')) }; + }) + ); + + // Fill another slot + queue.execute(() => new Promise(() => {})); + + // Queue a request + const queued = queue.execute(() => Promise.resolve('queued result')); + + // Fail the first request + resolver.resolve?.(); + + try { + await failing; + } catch { + // Expected + } + + await vi.advanceTimersByTimeAsync(0); + + const result = await queued; + expect(result).toBe('queued result'); + }); + }); + + describe('clear', () => { + it('should clear all queued requests', async () => { + // Fill concurrent slots + queue.execute(() => new Promise(() => {})); + queue.execute(() => new Promise(() => {})); + + // Queue requests + const queued1 = queue.execute(() => Promise.resolve()); + const queued2 = queue.execute(() => Promise.resolve()); + + expect(queue.getStats().queueLength).toBe(2); + + queue.clear('Test clear'); + + expect(queue.getStats().queueLength).toBe(0); + + await expect(queued1).rejects.toThrow('Test clear'); + await expect(queued2).rejects.toThrow('Test clear'); + }); + + it('should not affect active requests when clearing', async () => { + const resolver: { resolve?: (value: string) => void } = {}; + + // Start an active request + const active = queue.execute( + () => + new Promise((resolve) => { + resolver.resolve = resolve; + }) + ); + + // Queue a request + queue.execute(() => Promise.resolve()); + + queue.clear(); + + // Active request should still complete + resolver.resolve?.('active result'); + + const result = await active; + expect(result).toBe('active result'); + }); + }); + + describe('hasCapacity', () => { + it('should return true when under limits', () => { + expect(queue.hasCapacity()).toBe(true); + }); + + it('should return true when at concurrency limit but queue has space', () => { + // Fill concurrent slots + queue.execute(() => new Promise(() => {})); + queue.execute(() => new Promise(() => {})); + + expect(queue.hasCapacity()).toBe(true); + }); + + it('should return false when queue is full', () => { + // Fill concurrent slots + queue.execute(() => new Promise(() => {})); + queue.execute(() => new Promise(() => {})); + + // Fill queue + queue.execute(() => Promise.resolve()); + queue.execute(() => Promise.resolve()); + queue.execute(() => Promise.resolve()); + + expect(queue.hasCapacity()).toBe(false); + }); + }); + + describe('default options', () => { + it('should use sensible defaults', () => { + const defaultQueue = new RequestQueue(); + const stats = defaultQueue.getStats(); + + expect(stats.maxConcurrent).toBe(10); + expect(stats.maxQueueSize).toBe(100); + }); + }); +}); From a27578413e98051ffaea09f067704f5fb2b65db4 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 11 Dec 2025 12:29:56 +0000 Subject: [PATCH 2/7] update lock --- package-lock.json | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 0dacb840..3723f9aa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -415,6 +415,7 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -436,6 +437,7 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-6.4.1.tgz", "integrity": "sha512-+Oxm7q9hDoLMyJOYfUYBuHQo+dkAloi33apOPP56pzj+vsdJDzr+j1NISE5pyaAuKL4A3UD34qd0lx5+kfKp2g==", "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.4.4", @@ -575,6 +577,7 @@ "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.28.5.tgz", "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -1524,7 +1527,8 @@ "resolved": "https://registry.npmjs.org/@cloudflare/workers-types/-/workers-types-4.20251126.0.tgz", "integrity": "sha512-DSeI1Q7JYmh5/D/tw5eZCjrKY34v69rwj63hHt60nSQW5QLwWCbj/lLtNz9f2EPa+JCACwpLXHgCXfzJ29x66w==", "devOptional": true, - "license": "MIT OR Apache-2.0" + "license": "MIT OR Apache-2.0", + "peer": true }, "node_modules/@cspotcode/source-map-support": { "version": "0.8.1", @@ -2838,6 +2842,7 @@ "integrity": "sha512-/g2d4sW9nUDJOMz3mabVQvOGhVa4e/BN/Um7yca9Bb2XTzPPnfTWHWQg+IsEYO7M3Vx+EXvaM/I2pJWIMun1bg==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@octokit/auth-token": "^4.0.0", "@octokit/graphql": "^7.1.0", @@ -4322,6 +4327,7 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.1.tgz", "integrity": "sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==", "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -4337,6 +4343,7 @@ "resolved": "https://registry.npmjs.org/@types/react/-/react-19.2.7.tgz", "integrity": "sha512-MWtvHrGZLFttgeEj28VXHxpmwYbor/ATPYbBfSFZEIRK0ecCFLl2Qo55z52Hss+UV9CRN7trSeq1zbgx7YDWWg==", "license": "MIT", + "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -4346,6 +4353,7 @@ "resolved": "https://registry.npmjs.org/@types/react-dom/-/react-dom-19.2.3.tgz", "integrity": "sha512-jp2L/eY6fn+KgVVQAOqYItbF0VY/YApe5Mz2F0aykSO8gx31bYCZyvSeYxCHKvzHG5eZjc+zyaS5BrBWya2+kQ==", "license": "MIT", + "peer": true, "peerDependencies": { "@types/react": "^19.2.0" } @@ -4472,6 +4480,7 @@ "integrity": "sha512-oukfKT9Mk41LreEW09vt45f8wx7DordoWUZMYdY/cyAk7w5TWkTRCNZYF7sX7n2wB7jyGAl74OxgwhPgKaqDMQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@vitest/utils": "3.2.4", "pathe": "^2.0.3", @@ -4487,6 +4496,7 @@ "integrity": "sha512-dEYtS7qQP2CjU27QBC5oUOxLE/v5eLkGqPE0ZKEIDGMs4vKWe7IjgLOeauHsR0D5YuuycGRO5oSRXnwnmA78fQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@vitest/pretty-format": "3.2.4", "magic-string": "^0.30.17", @@ -4515,6 +4525,7 @@ "integrity": "sha512-hGISOaP18plkzbWEcP/QvtRW1xDXF2+96HbEX6byqQhAUbiS5oH6/9JwW+QsQCIYON2bI6QZBF+2PvOmrRZ9wA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@vitest/utils": "3.2.4", "fflate": "^0.8.2", @@ -4662,6 +4673,7 @@ "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.17.1.tgz", "integrity": "sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g==", "license": "MIT", + "peer": true, "dependencies": { "fast-deep-equal": "^3.1.3", "fast-uri": "^3.0.1", @@ -5407,6 +5419,7 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -5700,6 +5713,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.8.25", "caniuse-lite": "^1.0.30001754", @@ -7634,6 +7648,7 @@ "integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==", "devOptional": true, "license": "MIT", + "peer": true, "bin": { "jiti": "lib/jiti-cli.mjs" } @@ -7898,6 +7913,7 @@ "integrity": "sha512-utfs7Pr5uJyyvDETitgsaqSyjCb2qNRAtuqUeWIAKztsOYdcACf2KtARYXg2pSvhkt+9NfoaNY7fxjl6nuMjIQ==", "devOptional": true, "license": "MPL-2.0", + "peer": true, "dependencies": { "detect-libc": "^2.0.3" }, @@ -8183,7 +8199,6 @@ "resolved": "https://registry.npmjs.org/loose-envify/-/loose-envify-1.4.0.tgz", "integrity": "sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==", "license": "MIT", - "peer": true, "dependencies": { "js-tokens": "^3.0.0 || ^4.0.0" }, @@ -9752,6 +9767,7 @@ "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.6.2.tgz", "integrity": "sha512-I7AIg5boAr5R0FFtJ6rCfD+LFsWHp81dolrFD8S79U9tb8Az2nGrJncnMSnys+bpQJfRUzqs9hnA81OAA3hCuQ==", "license": "MIT", + "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -9968,6 +9984,7 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.0.tgz", "integrity": "sha512-tmbWg6W31tQLeB5cdIBOicJDJRR2KzXsV7uSK9iNfLWQ5bIZfxuPEHp7M8wiHyHnn0DD1i7w3Zmin0FtkrwoCQ==", "license": "MIT", + "peer": true, "engines": { "node": ">=0.10.0" } @@ -9977,6 +9994,7 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.2.0.tgz", "integrity": "sha512-UlbRu4cAiGaIewkPyiRGJk0imDN2T3JjieT6spoL2UeSf5od4n5LB/mQ4ejmxhCFT1tYe8IvaFulzynWovsEFQ==", "license": "MIT", + "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -9988,8 +10006,7 @@ "version": "16.13.1", "resolved": "https://registry.npmjs.org/react-is/-/react-is-16.13.1.tgz", "integrity": "sha512-24e6ynE2H+OKt4kqsOvNd8kBpV65zoxbA4BVsEOB3ARVWQki/DHzaUoC5KuON/BiccDaCCTZBuOcfZs70kR8bQ==", - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/react-katex": { "version": "3.1.0", @@ -10363,6 +10380,7 @@ "integrity": "sha512-ZRLgPlS91l4JztLYEZnmMcd3Umcla1hkXJgiEiR4HloRJBBoeaX8qogTu5Jfu36rRMVLndzqYv0h+M5gJAkUfg==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@oxc-project/types": "=0.98.0", "@rolldown/pluginutils": "1.0.0-beta.51" @@ -11178,6 +11196,7 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -11377,6 +11396,7 @@ "integrity": "sha512-ytQKuwgmrrkDTFP4LjR0ToE2nqgy886GpvRSpU0JAnrdBYppuY5rLkRUYPU1yCryb24SsKBTL/hlDQAEFVwtZg==", "devOptional": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "~0.25.0", "get-tsconfig": "^4.7.5" @@ -11536,6 +11556,7 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -11605,6 +11626,7 @@ "resolved": "https://registry.npmjs.org/unenv/-/unenv-2.0.0-rc.24.tgz", "integrity": "sha512-i7qRCmY42zmCwnYlh9H2SvLEypEFGye5iRmEMKjcGi7zk9UquigRjFtTLz0TYqr0ZGLZhaMHl/foy1bZR+Cwlw==", "license": "MIT", + "peer": true, "dependencies": { "pathe": "^2.0.3" } @@ -12046,6 +12068,7 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-7.2.4.tgz", "integrity": "sha512-NL8jTlbo0Tn4dUEXEsUg8KeyG/Lkmc4Fnzb8JXN/Ykm9G4HNImjtABMJgkQoVjOBN/j2WAwDTRytdqJbZsah7w==", "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -12160,6 +12183,7 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -12192,6 +12216,7 @@ "integrity": "sha512-LUCP5ev3GURDysTWiP47wRRUpLKMOfPh+yKTx3kVIEiu5KOMeqzpnYNsKyOoVrULivR8tLcks4+lga33Whn90A==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@types/chai": "^5.2.2", "@vitest/expect": "3.2.4", @@ -12617,6 +12642,7 @@ "integrity": "sha512-Om5ns0Lyx/LKtYI04IV0bjIrkBgoFNg0p6urzr2asekJlfP18RqFzyqMFZKf0i9Gnjtz/JfAS/Ol6tjCe5JJsQ==", "hasInstallScript": true, "license": "Apache-2.0", + "peer": true, "bin": { "workerd": "bin/workerd" }, @@ -13380,6 +13406,7 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", "license": "MIT", + "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } From 44f3b5e0123a87d1e9f561613973bbdf4d48b5fe Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 11 Dec 2025 12:49:46 +0000 Subject: [PATCH 3/7] update docs --- packages/sandbox/src/clients/circuit-breaker.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sandbox/src/clients/circuit-breaker.ts b/packages/sandbox/src/clients/circuit-breaker.ts index c5b6721c..de10fa1e 100644 --- a/packages/sandbox/src/clients/circuit-breaker.ts +++ b/packages/sandbox/src/clients/circuit-breaker.ts @@ -159,7 +159,6 @@ export class CircuitBreaker { /** * Record a failed request - * Only counts failures for 5xx errors (server errors) */ recordFailure(): void { const now = Date.now(); From 5dc260bb2a9f0d0a98e51e86575a83bd18f1e0f8 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 11 Dec 2025 13:10:37 +0000 Subject: [PATCH 4/7] improve PID management in Session class --- .../src/services/process-service.ts | 1 + packages/sandbox-container/src/session.ts | 136 ++++++++++++++++-- 2 files changed, 129 insertions(+), 8 deletions(-) diff --git a/packages/sandbox-container/src/services/process-service.ts b/packages/sandbox-container/src/services/process-service.ts index 6aa4cac0..38038af1 100644 --- a/packages/sandbox-container/src/services/process-service.ts +++ b/packages/sandbox-container/src/services/process-service.ts @@ -151,6 +151,7 @@ export class ProcessService { async (event) => { // Route events to process record listeners if (event.type === 'start' && event.pid !== undefined) { + processRecord.pid = event.pid; await this.store.update(processRecord.id, { pid: event.pid }); } else if (event.type === 'stdout' && event.data) { processRecord.stdout += event.data; diff --git a/packages/sandbox-container/src/session.ts b/packages/sandbox-container/src/session.ts index dc385ae1..5fba5cae 100644 --- a/packages/sandbox-container/src/session.ts +++ b/packages/sandbox-container/src/session.ts @@ -24,8 +24,8 @@ */ import { randomUUID } from 'node:crypto'; -import { watch } from 'node:fs'; -import { mkdir, rm, stat } from 'node:fs/promises'; +import { createReadStream, watch } from 'node:fs'; +import { mkdir, open, rm, stat } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { basename, dirname, join } from 'node:path'; import type { ExecEvent, Logger } from '@repo/shared'; @@ -335,6 +335,7 @@ export class Session { const logFile = join(this.sessionDir!, `${commandId}.log`); const exitCodeFile = join(this.sessionDir!, `${commandId}.exit`); const pidFile = join(this.sessionDir!, `${commandId}.pid`); + const pidPipe = join(this.sessionDir!, `${commandId}.pid.pipe`); const labelersDoneFile = join( this.sessionDir!, `${commandId}.labelers.done` @@ -351,6 +352,10 @@ export class Session { // Track command this.trackCommand(commandId, pidFile, logFile, exitCodeFile); + // Create PID notification FIFO before sending command + // This ensures synchronization: shell writes PID, we read it (blocking) + await this.createPidPipe(pidPipe); + // Build FIFO script for BACKGROUND execution // Command runs concurrently, shell continues immediately const bashScript = this.buildFIFOScript( @@ -360,7 +365,8 @@ export class Session { exitCodeFile, options?.cwd, true, - options?.env + options?.env, + pidPipe ); if (this.shell!.stdin && typeof this.shell!.stdin !== 'number') { @@ -369,14 +375,14 @@ export class Session { throw new Error('Shell stdin is not available'); } - // Wait for PID file to be created (bash script writes it after starting command) - const pid = await this.waitForPidFile(pidFile); + // Wait for PID via FIFO (blocking read - guarantees synchronization) + const pid = await this.waitForPidViaPipe(pidPipe, pidFile); if (pid === undefined) { - this.logger.warn('PID file not created within timeout', { + this.logger.warn('PID not received within timeout', { sessionId: this.id, commandId, - pidFile + pidPipe }); } @@ -690,6 +696,7 @@ export class Session { * * @param isBackground - If true, command runs in background (for execStream/startProcess) * If false, command runs in foreground (for exec) - state persists! + * @param pidPipe - Optional path to PID notification FIFO (for reliable PID synchronization) */ private buildFIFOScript( command: string, @@ -698,7 +705,8 @@ export class Session { exitCodeFile: string, cwd?: string, isBackground = false, - env?: Record + env?: Record, + pidPipe?: string ): string { // Create unique FIFO names to prevent collisions const stdoutPipe = join(this.sessionDir!, `${cmdId}.stdout.pipe`); @@ -714,6 +722,7 @@ export class Session { const safeSessionDir = this.escapeShellPath(this.sessionDir!); const safePidFile = this.escapeShellPath(pidFile); const safeLabelersDoneFile = this.escapeShellPath(labelersDoneFile); + const safePidPipe = pidPipe ? this.escapeShellPath(pidPipe) : null; const indentLines = (input: string, spaces: number) => { const prefix = ' '.repeat(spaces); @@ -794,6 +803,10 @@ export class Session { script += ` # Write PID for process killing\n`; script += ` echo "$CMD_PID" > ${safePidFile}.tmp\n`; script += ` mv ${safePidFile}.tmp ${safePidFile}\n`; + if (safePidPipe) { + script += ` # Notify PID via FIFO (unblocks waitForPidViaPipe)\n`; + script += ` echo "$CMD_PID" > ${safePidPipe}\n`; + } script += ` # Background monitor: waits for labelers to finish (after FIFO EOF)\n`; script += ` # and then removes the FIFOs. PID file is cleaned up by TypeScript.\n`; script += ` (\n`; @@ -806,6 +819,10 @@ export class Session { script += ` else\n`; script += ` printf '\\x02\\x02\\x02%s\\n' "Failed to change directory to ${safeCwd}" >> "$log"\n`; script += ` EXIT_CODE=1\n`; + if (safePidPipe) { + script += ` # Notify error via FIFO (unblocks waitForPidViaPipe with empty/error)\n`; + script += ` echo "" > ${safePidPipe}\n`; + } script += ` fi\n`; } else { script += ` # Execute command in BACKGROUND (runs in subshell, enables concurrency)\n`; @@ -818,6 +835,10 @@ export class Session { script += ` # Write PID for process killing\n`; script += ` echo "$CMD_PID" > ${safePidFile}.tmp\n`; script += ` mv ${safePidFile}.tmp ${safePidFile}\n`; + if (safePidPipe) { + script += ` # Notify PID via FIFO (unblocks waitForPidViaPipe)\n`; + script += ` echo "$CMD_PID" > ${safePidPipe}\n`; + } script += ` # Background monitor: waits for labelers to finish (after FIFO EOF)\n`; script += ` # and then removes the FIFOs. PID file is cleaned up by TypeScript.\n`; script += ` (\n`; @@ -1111,6 +1132,105 @@ export class Session { return undefined; } + /** + * Create a FIFO (named pipe) for PID notification + * This must be created BEFORE sending the command to the shell + */ + private async createPidPipe(pidPipe: string): Promise { + // Remove any existing pipe first + try { + await rm(pidPipe, { force: true }); + } catch { + // Ignore errors + } + + // Create the FIFO using mkfifo command + const result = Bun.spawnSync(['mkfifo', pidPipe]); + if (result.exitCode !== 0) { + throw new Error(`Failed to create PID pipe: ${result.stderr.toString()}`); + } + } + + /** + * Wait for PID via FIFO with fallback to file polling + * + * Uses a FIFO for reliable synchronization: the shell writes the PID to the pipe, + * and we do a blocking read. This eliminates race conditions from file polling. + * Falls back to file polling if FIFO read fails (e.g., pipe broken). + * + * @param pidPipe - Path to the PID notification FIFO + * @param pidFile - Path to the PID file (fallback) + * @param timeoutMs - Timeout for waiting + * @returns The PID or undefined if not available within timeout + */ + private async waitForPidViaPipe( + pidPipe: string, + pidFile: string, + timeoutMs: number = 5000 + ): Promise { + try { + // Read from FIFO with timeout + // Opening a FIFO for reading blocks until a writer opens it + const pid = await Promise.race([ + this.readPidFromPipe(pidPipe), + Bun.sleep(timeoutMs).then(() => undefined) + ]); + + if (pid !== undefined) { + return pid; + } + + // Timeout reached, fall back to file polling + this.logger.warn( + 'PID pipe read timed out, falling back to file polling', + { + pidPipe, + pidFile, + timeoutMs + } + ); + } catch (error) { + // FIFO read failed, fall back to file polling + this.logger.warn('PID pipe read failed, falling back to file polling', { + pidPipe, + pidFile, + error: error instanceof Error ? error.message : String(error) + }); + } finally { + // Clean up the pipe + try { + await rm(pidPipe, { force: true }); + } catch { + // Ignore cleanup errors + } + } + + // Fallback: poll the PID file (less reliable but works) + return this.waitForPidFile(pidFile, 1000); + } + + /** + * Read PID from a FIFO (named pipe) + * This blocks until the shell writes the PID + * + * Uses Node.js fs.open which properly handles FIFOs - the open() call + * blocks until a writer opens the pipe, then we read the content. + */ + private async readPidFromPipe(pidPipe: string): Promise { + // Open the FIFO for reading - this blocks until a writer opens it + const fd = await open(pidPipe, 'r'); + try { + // Read content from the FIFO + const buffer = Buffer.alloc(64); + const { bytesRead } = await fd.read(buffer, 0, 64, null); + const content = buffer.toString('utf8', 0, bytesRead).trim(); + const pid = parseInt(content, 10); + return Number.isNaN(pid) ? undefined : pid; + } finally { + await fd.close(); + } + } + /** * Escape shell path for safe usage in bash scripts */ From ead029cdd4c698edf5655b09cb270e3ae65358ff Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 11 Dec 2025 17:40:31 +0000 Subject: [PATCH 5/7] revert circuit and queue changes --- packages/sandbox/src/clients/base-client.ts | 65 +-- .../sandbox/src/clients/circuit-breaker.ts | 237 --------- packages/sandbox/src/clients/index.ts | 15 +- packages/sandbox/src/clients/request-queue.ts | 244 --------- .../sandbox/src/clients/sandbox-client.ts | 67 +-- packages/sandbox/src/clients/types.ts | 38 -- packages/sandbox/src/index.ts | 10 - .../sandbox/tests/circuit-breaker.test.ts | 273 ---------- packages/sandbox/tests/request-queue.test.ts | 503 ------------------ 9 files changed, 9 insertions(+), 1443 deletions(-) delete mode 100644 packages/sandbox/src/clients/circuit-breaker.ts delete mode 100644 packages/sandbox/src/clients/request-queue.ts delete mode 100644 packages/sandbox/tests/circuit-breaker.test.ts delete mode 100644 packages/sandbox/tests/request-queue.test.ts diff --git a/packages/sandbox/src/clients/base-client.ts b/packages/sandbox/src/clients/base-client.ts index 93a85086..d9e0242c 100644 --- a/packages/sandbox/src/clients/base-client.ts +++ b/packages/sandbox/src/clients/base-client.ts @@ -1,10 +1,9 @@ import type { Logger } from '@repo/shared'; import { createNoOpLogger } from '@repo/shared'; +import { getHttpStatus } from '@repo/shared/errors'; import type { ErrorResponse as NewErrorResponse } from '../errors'; import { createErrorFromResponse, ErrorCode } from '../errors'; import type { SandboxError } from '../errors/classes'; -import type { CircuitBreaker } from './circuit-breaker'; -import type { RequestQueue } from './request-queue'; import type { HttpClientOptions, ResponseHandler } from './types'; // Container startup retry configuration @@ -18,54 +17,18 @@ export abstract class BaseHttpClient { protected baseUrl: string; protected options: HttpClientOptions; protected logger: Logger; - protected circuitBreaker?: CircuitBreaker; - protected requestQueue?: RequestQueue; constructor(options: HttpClientOptions = {}) { this.options = options; this.logger = options.logger ?? createNoOpLogger(); this.baseUrl = this.options.baseUrl!; - - // Use injected instances (shared across all clients in SandboxClient) - this.circuitBreaker = options._circuitBreaker; - this.requestQueue = options._requestQueue; } /** - * Core HTTP request method with resilience features - * - * Request flow: - * 1. Circuit breaker check - fail fast if circuit is open - * 2. Request queue - wait if at concurrency limit - * 3. Execute fetch with container startup retry logic - * 4. Record success/failure for circuit breaker - */ - protected async doFetch( - path: string, - options?: RequestInit - ): Promise { - // Step 1: Check circuit breaker (fail fast if open) - if (this.circuitBreaker && !this.circuitBreaker.canExecute()) { - const stats = this.circuitBreaker.getStats(); - const { CircuitOpenError } = await import('./circuit-breaker'); - throw new CircuitOpenError(stats.remainingRecoveryMs ?? 0); - } - - // Step 2: Execute through request queue if available - const executeRequest = () => this.doFetchWithRetry(path, options); - - if (this.requestQueue) { - return this.requestQueue.execute(executeRequest); - } - - return executeRequest(); - } - - /** - * Internal fetch with container startup retry logic + * Core HTTP request method with automatic retry for container startup delays * Retries both 503 (provisioning) and 500 (startup failure) errors when they're container-related */ - private async doFetchWithRetry( + protected async doFetch( path: string, options?: RequestInit ): Promise { @@ -73,15 +36,7 @@ export abstract class BaseHttpClient { let attempt = 0; while (true) { - let response: Response; - - try { - response = await this.executeFetch(path, options); - } catch (error) { - // Network error - record as failure for circuit breaker - this.circuitBreaker?.recordFailure(); - throw error; - } + const response = await this.executeFetch(path, options); // Check if this is a retryable container error (both 500 and 503) const shouldRetry = await this.isRetryableContainerError(response); @@ -107,25 +62,17 @@ export abstract class BaseHttpClient { continue; } - // Timeout exhausted - record as failure + // Timeout exhausted this.logger.error( 'Container failed to become ready', new Error( `Failed after ${attempt + 1} attempts over ${Math.floor(elapsed / 1000)}s` ) ); - this.circuitBreaker?.recordFailure(); return response; } - // Record success/failure for circuit breaker based on response status - if (response.ok) { - this.circuitBreaker?.recordSuccess(); - } else if (response.status >= 500) { - // Only count server errors as failures for circuit breaker - this.circuitBreaker?.recordFailure(); - } - + // Not a retryable error or request succeeded return response; } } diff --git a/packages/sandbox/src/clients/circuit-breaker.ts b/packages/sandbox/src/clients/circuit-breaker.ts deleted file mode 100644 index de10fa1e..00000000 --- a/packages/sandbox/src/clients/circuit-breaker.ts +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Circuit breaker states - */ -export type CircuitState = 'closed' | 'open' | 'half-open'; - -/** - * Configuration options for the circuit breaker - */ -export interface CircuitBreakerOptions { - /** - * Number of failures in the window before opening the circuit - * @default 5 - */ - failureThreshold?: number; - - /** - * Time window in milliseconds for counting failures - * @default 30000 (30 seconds) - */ - failureWindow?: number; - - /** - * Time in milliseconds to wait before attempting recovery (half-open state) - * @default 10000 (10 seconds) - */ - recoveryTimeout?: number; - - /** - * Number of successful requests needed in half-open state to close the circuit - * @default 2 - */ - successThreshold?: number; - - /** - * Callback when circuit state changes - */ - onStateChange?: (state: CircuitState, previousState: CircuitState) => void; -} - -/** - * Error thrown when circuit is open and requests are rejected - */ -export class CircuitOpenError extends Error { - readonly name = 'CircuitOpenError'; - readonly remainingMs: number; - - constructor(remainingMs: number) { - super( - `Circuit breaker is open. Service is unavailable. Retry after ${Math.ceil(remainingMs / 1000)}s.` - ); - this.remainingMs = remainingMs; - } -} - -/** - * Circuit breaker implementation to protect against cascading failures - * - * States: - * - CLOSED: Normal operation, requests pass through - * - OPEN: Failures exceeded threshold, requests fail fast - * - HALF-OPEN: Testing if service recovered, limited requests allowed - * - * The circuit breaker tracks failures in a sliding window. When failures - * exceed the threshold, it opens and rejects requests immediately. After - * a recovery timeout, it enters half-open state to test if the service - * has recovered. - */ -export class CircuitBreaker { - private state: CircuitState = 'closed'; - private failures: number[] = []; // Timestamps of recent failures - private successCount = 0; // Successes in half-open state - private openedAt = 0; // When circuit opened - private readonly options: Required< - Omit - > & { - onStateChange?: CircuitBreakerOptions['onStateChange']; - }; - - constructor(options: CircuitBreakerOptions = {}) { - this.options = { - failureThreshold: options.failureThreshold ?? 5, - failureWindow: options.failureWindow ?? 30_000, - recoveryTimeout: options.recoveryTimeout ?? 10_000, - successThreshold: options.successThreshold ?? 2, - onStateChange: options.onStateChange - }; - } - - /** - * Get current circuit state - */ - getState(): CircuitState { - return this.state; - } - - /** - * Check if a request should be allowed through - * @returns true if request can proceed, false if circuit is open - * @throws CircuitOpenError if circuit is open (prefer using canExecute for non-throwing check) - */ - canExecute(): boolean { - this.cleanupOldFailures(); - - switch (this.state) { - case 'closed': - return true; - - case 'open': { - const elapsed = Date.now() - this.openedAt; - if (elapsed >= this.options.recoveryTimeout) { - // Transition to half-open - this.setState('half-open'); - return true; - } - return false; - } - - case 'half-open': - // Allow limited requests through for testing - return true; - } - } - - /** - * Execute a request through the circuit breaker - * Tracks success/failure and manages state transitions - */ - async execute(fn: () => Promise): Promise { - if (!this.canExecute()) { - const remaining = - this.options.recoveryTimeout - (Date.now() - this.openedAt); - throw new CircuitOpenError(remaining); - } - - try { - const result = await fn(); - this.recordSuccess(); - return result; - } catch (error) { - this.recordFailure(); - throw error; - } - } - - /** - * Record a successful request - */ - recordSuccess(): void { - if (this.state === 'half-open') { - this.successCount++; - if (this.successCount >= this.options.successThreshold) { - // Service has recovered - this.setState('closed'); - this.failures = []; - this.successCount = 0; - } - } - } - - /** - * Record a failed request - */ - recordFailure(): void { - const now = Date.now(); - this.failures.push(now); - this.cleanupOldFailures(); - - if (this.state === 'half-open') { - // Any failure in half-open immediately reopens - this.setState('open'); - this.openedAt = now; - this.successCount = 0; - } else if (this.state === 'closed') { - if (this.failures.length >= this.options.failureThreshold) { - this.setState('open'); - this.openedAt = now; - } - } - } - - /** - * Reset the circuit breaker to closed state - * Useful for manual intervention or testing - */ - reset(): void { - this.setState('closed'); - this.failures = []; - this.successCount = 0; - this.openedAt = 0; - } - - /** - * Get statistics about the circuit breaker - */ - getStats(): { - state: CircuitState; - failureCount: number; - successCount: number; - remainingRecoveryMs: number | null; - } { - this.cleanupOldFailures(); - - let remainingRecoveryMs: number | null = null; - if (this.state === 'open') { - remainingRecoveryMs = Math.max( - 0, - this.options.recoveryTimeout - (Date.now() - this.openedAt) - ); - } - - return { - state: this.state, - failureCount: this.failures.length, - successCount: this.successCount, - remainingRecoveryMs - }; - } - - /** - * Remove failures outside the sliding window - */ - private cleanupOldFailures(): void { - const cutoff = Date.now() - this.options.failureWindow; - this.failures = this.failures.filter((timestamp) => timestamp > cutoff); - } - - /** - * Set state and notify listener - */ - private setState(newState: CircuitState): void { - if (this.state !== newState) { - const previousState = this.state; - this.state = newState; - this.options.onStateChange?.(newState, previousState); - } - } -} diff --git a/packages/sandbox/src/clients/index.ts b/packages/sandbox/src/clients/index.ts index 72231c43..840ef1ce 100644 --- a/packages/sandbox/src/clients/index.ts +++ b/packages/sandbox/src/clients/index.ts @@ -1,14 +1,8 @@ // Main client exports -// Resilience exports -export { - CircuitBreaker, - type CircuitBreakerOptions, - CircuitOpenError, - type CircuitState -} from './circuit-breaker'; // Command client types export type { ExecuteRequest, ExecuteResponse } from './command-client'; + // Domain-specific clients export { CommandClient } from './command-client'; // File client types @@ -46,12 +40,6 @@ export type { StartProcessRequest } from './process-client'; export { ProcessClient } from './process-client'; -export { - QueueFullError, - QueueTimeoutError, - RequestQueue, - type RequestQueueOptions -} from './request-queue'; export { SandboxClient } from './sandbox-client'; // Types and interfaces export type { @@ -60,7 +48,6 @@ export type { ErrorResponse, HttpClientOptions, RequestConfig, - ResilienceOptions, ResponseHandler, SessionRequest } from './types'; diff --git a/packages/sandbox/src/clients/request-queue.ts b/packages/sandbox/src/clients/request-queue.ts deleted file mode 100644 index 11c57dc2..00000000 --- a/packages/sandbox/src/clients/request-queue.ts +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Configuration options for the request queue - */ -export interface RequestQueueOptions { - /** - * Maximum number of concurrent requests allowed - * @default 10 - */ - maxConcurrent?: number; - - /** - * Maximum number of requests waiting in queue - * When exceeded, oldest requests are rejected - * @default 100 - */ - maxQueueSize?: number; - - /** - * Timeout for requests waiting in queue (milliseconds) - * @default 30000 (30 seconds) - */ - queueTimeout?: number; - - /** - * Callback when a request is queued - */ - onQueued?: (queueLength: number) => void; - - /** - * Callback when a request is dequeued - */ - onDequeued?: (waitTime: number) => void; -} - -/** - * Error thrown when queue is full and request is rejected - */ -export class QueueFullError extends Error { - readonly name = 'QueueFullError'; - readonly queueSize: number; - - constructor(queueSize: number) { - super( - `Request queue is full (${queueSize} pending requests). Service is overloaded.` - ); - this.queueSize = queueSize; - } -} - -/** - * Error thrown when request times out waiting in queue - */ -export class QueueTimeoutError extends Error { - readonly name = 'QueueTimeoutError'; - readonly waitTime: number; - - constructor(waitTime: number) { - super( - `Request timed out after ${Math.ceil(waitTime / 1000)}s waiting in queue.` - ); - this.waitTime = waitTime; - } -} - -/** - * Queued request with its resolver - */ -interface QueuedRequest { - execute: () => Promise; - resolve: (value: T) => void; - reject: (error: unknown) => void; - queuedAt: number; - timeoutId?: ReturnType; -} - -/** - * Request queue with concurrency limiting - * - * Provides backpressure by limiting the number of concurrent requests - * and queuing excess requests. This smooths out traffic bursts and - * prevents overwhelming downstream services. - * - * Features: - * - Configurable concurrency limit - * - Queue size limit to prevent memory exhaustion - * - Timeout for queued requests - * - FIFO ordering for fairness - */ -export class RequestQueue { - private readonly options: Required< - Omit - > & { - onQueued?: RequestQueueOptions['onQueued']; - onDequeued?: RequestQueueOptions['onDequeued']; - }; - private activeCount = 0; - private readonly queue: QueuedRequest[] = []; - - constructor(options: RequestQueueOptions = {}) { - this.options = { - maxConcurrent: options.maxConcurrent ?? 10, - maxQueueSize: options.maxQueueSize ?? 100, - queueTimeout: options.queueTimeout ?? 30_000, - onQueued: options.onQueued, - onDequeued: options.onDequeued - }; - } - - /** - * Execute a request through the queue - * Returns immediately if under concurrency limit, otherwise queues - */ - async execute(fn: () => Promise): Promise { - // Fast path: under concurrency limit - if (this.activeCount < this.options.maxConcurrent) { - return this.executeNow(fn); - } - - // Need to queue - return this.enqueue(fn); - } - - /** - * Execute immediately, tracking active count - */ - private async executeNow(fn: () => Promise): Promise { - this.activeCount++; - - try { - return await fn(); - } finally { - this.activeCount--; - this.processQueue(); - } - } - - /** - * Add request to queue - */ - private enqueue(fn: () => Promise): Promise { - // Check queue size limit - if (this.queue.length >= this.options.maxQueueSize) { - throw new QueueFullError(this.queue.length); - } - - return new Promise((resolve, reject) => { - const queuedAt = Date.now(); - - const request: QueuedRequest = { - execute: fn, - resolve: resolve as (value: unknown) => void, - reject, - queuedAt - }; - - // Set up timeout - request.timeoutId = setTimeout(() => { - // Remove from queue - const index = this.queue.indexOf(request as QueuedRequest); - if (index !== -1) { - this.queue.splice(index, 1); - reject(new QueueTimeoutError(Date.now() - queuedAt)); - } - }, this.options.queueTimeout); - - this.queue.push(request as QueuedRequest); - this.options.onQueued?.(this.queue.length); - }); - } - - /** - * Process next request from queue if capacity available - */ - private processQueue(): void { - if (this.queue.length === 0) { - return; - } - - if (this.activeCount >= this.options.maxConcurrent) { - return; - } - - const request = this.queue.shift(); - if (!request) { - return; - } - - // Clear timeout - if (request.timeoutId) { - clearTimeout(request.timeoutId); - } - - const waitTime = Date.now() - request.queuedAt; - this.options.onDequeued?.(waitTime); - - // Execute the request - this.executeNow(request.execute) - .then(request.resolve) - .catch(request.reject); - } - - /** - * Get current queue statistics - */ - getStats(): { - activeCount: number; - queueLength: number; - maxConcurrent: number; - maxQueueSize: number; - } { - return { - activeCount: this.activeCount, - queueLength: this.queue.length, - maxConcurrent: this.options.maxConcurrent, - maxQueueSize: this.options.maxQueueSize - }; - } - - /** - * Check if queue has capacity for new requests - */ - hasCapacity(): boolean { - return ( - this.activeCount < this.options.maxConcurrent || - this.queue.length < this.options.maxQueueSize - ); - } - - /** - * Clear the queue, rejecting all pending requests - * @param reason - Error message for rejected requests - */ - clear(reason = 'Queue cleared'): void { - while (this.queue.length > 0) { - const request = this.queue.shift(); - if (request) { - if (request.timeoutId) { - clearTimeout(request.timeoutId); - } - request.reject(new Error(reason)); - } - } - } -} diff --git a/packages/sandbox/src/clients/sandbox-client.ts b/packages/sandbox/src/clients/sandbox-client.ts index 1387a5e2..fa1599eb 100644 --- a/packages/sandbox/src/clients/sandbox-client.ts +++ b/packages/sandbox/src/clients/sandbox-client.ts @@ -1,20 +1,15 @@ -import { CircuitBreaker } from './circuit-breaker'; import { CommandClient } from './command-client'; import { FileClient } from './file-client'; import { GitClient } from './git-client'; import { InterpreterClient } from './interpreter-client'; import { PortClient } from './port-client'; import { ProcessClient } from './process-client'; -import { RequestQueue } from './request-queue'; import type { HttpClientOptions } from './types'; import { UtilityClient } from './utility-client'; /** * Main sandbox client that composes all domain-specific clients * Provides organized access to all sandbox functionality - * - * Resilience features (circuit breaker, request queue) are shared across - * all domain clients to provide coordinated protection against overload. */ export class SandboxClient { public readonly commands: CommandClient; @@ -25,40 +20,11 @@ export class SandboxClient { public readonly interpreter: InterpreterClient; public readonly utils: UtilityClient; - /** Shared circuit breaker instance */ - public readonly circuitBreaker?: CircuitBreaker; - - /** Shared request queue instance */ - public readonly requestQueue?: RequestQueue; - constructor(options: HttpClientOptions) { - // Create shared resilience instances based on configuration - const resilience = options.resilience; - - // Create circuit breaker unless explicitly disabled - if (resilience?.circuitBreaker !== false) { - this.circuitBreaker = new CircuitBreaker( - typeof resilience?.circuitBreaker === 'object' - ? resilience.circuitBreaker - : undefined - ); - } - - // Create request queue unless explicitly disabled - if (resilience?.requestQueue !== false) { - this.requestQueue = new RequestQueue( - typeof resilience?.requestQueue === 'object' - ? resilience.requestQueue - : undefined - ); - } - - // Build client options with shared instances + // Ensure baseUrl is provided for all clients const clientOptions: HttpClientOptions = { baseUrl: 'http://localhost:3000', - ...options, - _circuitBreaker: this.circuitBreaker, - _requestQueue: this.requestQueue + ...options }; // Initialize all domain clients with shared options @@ -70,33 +36,4 @@ export class SandboxClient { this.interpreter = new InterpreterClient(clientOptions); this.utils = new UtilityClient(clientOptions); } - - /** - * Get current resilience statistics - */ - getResilienceStats(): { - circuitBreaker?: ReturnType; - requestQueue?: ReturnType; - } { - return { - circuitBreaker: this.circuitBreaker?.getStats(), - requestQueue: this.requestQueue?.getStats() - }; - } - - /** - * Reset circuit breaker to closed state - * Useful for manual intervention after fixing issues - */ - resetCircuitBreaker(): void { - this.circuitBreaker?.reset(); - } - - /** - * Clear the request queue, rejecting all pending requests - * @param reason - Error message for rejected requests - */ - clearRequestQueue(reason?: string): void { - this.requestQueue?.clear(reason); - } } diff --git a/packages/sandbox/src/clients/types.ts b/packages/sandbox/src/clients/types.ts index c4566a18..b59d8af3 100644 --- a/packages/sandbox/src/clients/types.ts +++ b/packages/sandbox/src/clients/types.ts @@ -1,6 +1,4 @@ import type { Logger } from '@repo/shared'; -import type { CircuitBreaker, CircuitBreakerOptions } from './circuit-breaker'; -import type { RequestQueue, RequestQueueOptions } from './request-queue'; /** * Minimal interface for container fetch functionality @@ -13,24 +11,6 @@ export interface ContainerStub { ): Promise; } -/** - * Resilience configuration for the HTTP client - * Controls circuit breaker and request queue behavior - */ -export interface ResilienceOptions { - /** - * Circuit breaker configuration - * Set to false to disable circuit breaker entirely - */ - circuitBreaker?: CircuitBreakerOptions | false; - - /** - * Request queue configuration - * Set to false to disable request queuing entirely - */ - requestQueue?: RequestQueueOptions | false; -} - /** * Shared HTTP client configuration options */ @@ -47,24 +27,6 @@ export interface HttpClientOptions { command: string ) => void; onError?: (error: string, command?: string) => void; - - /** - * Resilience configuration (circuit breaker, request queue) - * These components are shared across all domain clients - */ - resilience?: ResilienceOptions; - - /** - * Shared circuit breaker instance (injected by SandboxClient) - * @internal - */ - _circuitBreaker?: CircuitBreaker; - - /** - * Shared request queue instance (injected by SandboxClient) - * @internal - */ - _requestQueue?: RequestQueue; } /** diff --git a/packages/sandbox/src/index.ts b/packages/sandbox/src/index.ts index e663dcf4..ea964d0e 100644 --- a/packages/sandbox/src/index.ts +++ b/packages/sandbox/src/index.ts @@ -1,22 +1,12 @@ // Export the main Sandbox class and utilities // Export the new client architecture -// Export resilience components for advanced usage export { - CircuitBreaker, - type CircuitBreakerOptions, - CircuitOpenError, - type CircuitState, CommandClient, FileClient, GitClient, PortClient, ProcessClient, - QueueFullError, - QueueTimeoutError, - RequestQueue, - type RequestQueueOptions, - type ResilienceOptions, SandboxClient, UtilityClient } from './clients'; diff --git a/packages/sandbox/tests/circuit-breaker.test.ts b/packages/sandbox/tests/circuit-breaker.test.ts deleted file mode 100644 index 988402e0..00000000 --- a/packages/sandbox/tests/circuit-breaker.test.ts +++ /dev/null @@ -1,273 +0,0 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - CircuitBreaker, - CircuitOpenError, - type CircuitState -} from '../src/clients/circuit-breaker'; - -describe('CircuitBreaker', () => { - let circuitBreaker: CircuitBreaker; - - beforeEach(() => { - vi.useFakeTimers(); - circuitBreaker = new CircuitBreaker({ - failureThreshold: 3, - failureWindow: 10_000, // 10 seconds - recoveryTimeout: 5_000, // 5 seconds - successThreshold: 2 - }); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - describe('initial state', () => { - it('should start in closed state', () => { - expect(circuitBreaker.getState()).toBe('closed'); - }); - - it('should allow requests when closed', () => { - expect(circuitBreaker.canExecute()).toBe(true); - }); - - it('should have correct initial stats', () => { - const stats = circuitBreaker.getStats(); - expect(stats.state).toBe('closed'); - expect(stats.failureCount).toBe(0); - expect(stats.successCount).toBe(0); - expect(stats.remainingRecoveryMs).toBeNull(); - }); - }); - - describe('failure tracking', () => { - it('should track failures', () => { - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - const stats = circuitBreaker.getStats(); - expect(stats.failureCount).toBe(2); - expect(stats.state).toBe('closed'); // Still below threshold - }); - - it('should open circuit when failure threshold is reached', () => { - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - expect(circuitBreaker.getState()).toBe('open'); - expect(circuitBreaker.canExecute()).toBe(false); - }); - - it('should clear old failures outside the window', () => { - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - // Advance past the failure window - vi.advanceTimersByTime(11_000); - - const stats = circuitBreaker.getStats(); - expect(stats.failureCount).toBe(0); - }); - - it('should not open circuit if failures are spread across windows', () => { - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - vi.advanceTimersByTime(11_000); // Past first failures - - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - // Only 2 failures in current window, below threshold - expect(circuitBreaker.getState()).toBe('closed'); - }); - }); - - describe('state transitions', () => { - it('should transition to half-open after recovery timeout', () => { - // Open the circuit - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - expect(circuitBreaker.getState()).toBe('open'); - - // Advance past recovery timeout - vi.advanceTimersByTime(6_000); - - // Check canExecute triggers transition to half-open - expect(circuitBreaker.canExecute()).toBe(true); - expect(circuitBreaker.getState()).toBe('half-open'); - }); - - it('should close circuit after success threshold in half-open', () => { - // Open the circuit - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - // Wait for recovery - vi.advanceTimersByTime(6_000); - circuitBreaker.canExecute(); // Triggers half-open - - // Record successes - circuitBreaker.recordSuccess(); - expect(circuitBreaker.getState()).toBe('half-open'); - - circuitBreaker.recordSuccess(); - expect(circuitBreaker.getState()).toBe('closed'); - }); - - it('should reopen circuit on failure in half-open state', () => { - // Open the circuit - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - // Wait for recovery - vi.advanceTimersByTime(6_000); - circuitBreaker.canExecute(); // Triggers half-open - - // Record a failure - circuitBreaker.recordFailure(); - - expect(circuitBreaker.getState()).toBe('open'); - }); - - it('should notify on state changes', () => { - const transitions: Array<{ from: CircuitState; to: CircuitState }> = []; - - circuitBreaker = new CircuitBreaker({ - failureThreshold: 2, - recoveryTimeout: 5_000, - successThreshold: 1, - onStateChange: (to, from) => { - transitions.push({ from, to }); - } - }); - - // Trigger closed -> open - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - expect(transitions).toHaveLength(1); - expect(transitions[0]).toEqual({ from: 'closed', to: 'open' }); - - // Trigger open -> half-open - vi.advanceTimersByTime(6_000); - circuitBreaker.canExecute(); - - expect(transitions).toHaveLength(2); - expect(transitions[1]).toEqual({ from: 'open', to: 'half-open' }); - - // Trigger half-open -> closed - circuitBreaker.recordSuccess(); - - expect(transitions).toHaveLength(3); - expect(transitions[2]).toEqual({ from: 'half-open', to: 'closed' }); - }); - }); - - describe('execute method', () => { - it('should execute function when circuit is closed', async () => { - const fn = vi.fn().mockResolvedValue('result'); - - const result = await circuitBreaker.execute(fn); - - expect(result).toBe('result'); - expect(fn).toHaveBeenCalled(); - }); - - it('should record success on successful execution', async () => { - const fn = vi.fn().mockResolvedValue('result'); - - await circuitBreaker.execute(fn); - - // Open circuit first to test half-open behavior - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - vi.advanceTimersByTime(6_000); - circuitBreaker.canExecute(); // half-open - - await circuitBreaker.execute(fn); - await circuitBreaker.execute(fn); - - expect(circuitBreaker.getState()).toBe('closed'); - }); - - it('should record failure on failed execution', async () => { - const fn = vi.fn().mockRejectedValue(new Error('fail')); - - await expect(circuitBreaker.execute(fn)).rejects.toThrow('fail'); - await expect(circuitBreaker.execute(fn)).rejects.toThrow('fail'); - await expect(circuitBreaker.execute(fn)).rejects.toThrow('fail'); - - expect(circuitBreaker.getState()).toBe('open'); - }); - - it('should throw CircuitOpenError when circuit is open', async () => { - // Open the circuit - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - const fn = vi.fn().mockResolvedValue('result'); - - await expect(circuitBreaker.execute(fn)).rejects.toThrow( - CircuitOpenError - ); - expect(fn).not.toHaveBeenCalled(); - }); - - it('should include remaining recovery time in CircuitOpenError', async () => { - // Open the circuit - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - // Advance partway through recovery - vi.advanceTimersByTime(2_000); - - const fn = vi.fn().mockResolvedValue('result'); - - try { - await circuitBreaker.execute(fn); - } catch (error) { - expect(error).toBeInstanceOf(CircuitOpenError); - expect((error as CircuitOpenError).remainingMs).toBeCloseTo(3_000, -2); - } - }); - }); - - describe('reset', () => { - it('should reset circuit to closed state', () => { - // Open the circuit - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - circuitBreaker.recordFailure(); - - expect(circuitBreaker.getState()).toBe('open'); - - circuitBreaker.reset(); - - expect(circuitBreaker.getState()).toBe('closed'); - expect(circuitBreaker.getStats().failureCount).toBe(0); - }); - }); - - describe('default options', () => { - it('should use sensible defaults', () => { - const defaultBreaker = new CircuitBreaker(); - - // Record 5 failures (default threshold) - for (let i = 0; i < 5; i++) { - defaultBreaker.recordFailure(); - } - - expect(defaultBreaker.getState()).toBe('open'); - }); - }); -}); diff --git a/packages/sandbox/tests/request-queue.test.ts b/packages/sandbox/tests/request-queue.test.ts deleted file mode 100644 index 65b3aa38..00000000 --- a/packages/sandbox/tests/request-queue.test.ts +++ /dev/null @@ -1,503 +0,0 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - QueueFullError, - QueueTimeoutError, - RequestQueue -} from '../src/clients/request-queue'; - -describe('RequestQueue', () => { - let queue: RequestQueue; - - beforeEach(() => { - vi.useFakeTimers(); - queue = new RequestQueue({ - maxConcurrent: 2, - maxQueueSize: 3, - queueTimeout: 5_000 - }); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - describe('initial state', () => { - it('should have correct initial stats', () => { - const stats = queue.getStats(); - expect(stats.activeCount).toBe(0); - expect(stats.queueLength).toBe(0); - expect(stats.maxConcurrent).toBe(2); - expect(stats.maxQueueSize).toBe(3); - }); - - it('should have capacity when empty', () => { - expect(queue.hasCapacity()).toBe(true); - }); - }); - - describe('basic execution', () => { - it('should execute requests immediately when under concurrency limit', async () => { - const fn = vi.fn().mockResolvedValue('result'); - - const result = await queue.execute(fn); - - expect(result).toBe('result'); - expect(fn).toHaveBeenCalled(); - }); - - it('should track active count during execution', async () => { - let activeCountDuringExecution = -1; - - const fn = vi.fn().mockImplementation(async () => { - activeCountDuringExecution = queue.getStats().activeCount; - return 'result'; - }); - - await queue.execute(fn); - - expect(activeCountDuringExecution).toBe(1); - expect(queue.getStats().activeCount).toBe(0); - }); - - it('should allow concurrent requests up to limit', async () => { - const results: string[] = []; - const resolvers: Array<(value: string) => void> = []; - - // Start 2 concurrent requests (at limit) - const request1 = queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - const request2 = queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - - // Both should be executing - expect(queue.getStats().activeCount).toBe(2); - expect(queue.getStats().queueLength).toBe(0); - - // Resolve both - resolvers[0]('result1'); - resolvers[1]('result2'); - - results.push(await request1); - results.push(await request2); - - expect(results).toEqual(['result1', 'result2']); - }); - }); - - describe('queueing behavior', () => { - it('should queue requests when at concurrency limit', async () => { - const resolvers: Array<(value: string) => void> = []; - - // Fill up concurrent slots - queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - - // This should be queued - const queued = queue.execute(() => Promise.resolve('queued')); - - expect(queue.getStats().activeCount).toBe(2); - expect(queue.getStats().queueLength).toBe(1); - - // Resolve one active request - resolvers[0]('first'); - - // Allow queued request to process - await vi.advanceTimersByTimeAsync(0); - - const result = await queued; - expect(result).toBe('queued'); - }); - - it('should process queue in FIFO order', async () => { - const results: number[] = []; - const resolvers: Array<(value: number) => void> = []; - - // Fill concurrent slots - const active1 = queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - const active2 = queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - - // Queue additional requests - const queued1 = queue.execute(async () => { - results.push(1); - return 1; - }); - const queued2 = queue.execute(async () => { - results.push(2); - return 2; - }); - const queued3 = queue.execute(async () => { - results.push(3); - return 3; - }); - - // Resolve active requests - resolvers[0](0); - resolvers[1](0); - - await active1; - await active2; - await vi.advanceTimersByTimeAsync(0); - await queued1; - await vi.advanceTimersByTimeAsync(0); - await queued2; - await vi.advanceTimersByTimeAsync(0); - await queued3; - - expect(results).toEqual([1, 2, 3]); - }); - - it('should notify when requests are queued', async () => { - const onQueued = vi.fn(); - queue = new RequestQueue({ - maxConcurrent: 1, - maxQueueSize: 5, - onQueued - }); - - const resolver: { resolve?: () => void } = {}; - - // Fill concurrent slot - queue.execute( - () => - new Promise((resolve) => { - resolver.resolve = resolve; - }) - ); - - // Queue a request - queue.execute(() => Promise.resolve()); - - expect(onQueued).toHaveBeenCalledWith(1); - - // Cleanup - resolver.resolve?.(); - }); - - it('should notify when requests are dequeued with wait time', async () => { - const onDequeued = vi.fn(); - queue = new RequestQueue({ - maxConcurrent: 1, - maxQueueSize: 5, - queueTimeout: 30_000, - onDequeued - }); - - const resolver: { resolve?: () => void } = {}; - - // Fill concurrent slot - queue.execute( - () => - new Promise((resolve) => { - resolver.resolve = resolve; - }) - ); - - // Queue a request - const queued = queue.execute(() => Promise.resolve()); - - // Wait some time - vi.advanceTimersByTime(1_000); - - // Release the active slot - resolver.resolve?.(); - - await vi.advanceTimersByTimeAsync(0); - await queued; - - expect(onDequeued).toHaveBeenCalled(); - expect(onDequeued.mock.calls[0][0]).toBeGreaterThanOrEqual(1_000); - }); - }); - - describe('queue limits', () => { - it('should throw QueueFullError when queue is full', async () => { - const resolvers: Array<() => void> = []; - - // Fill concurrent slots - queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - queue.execute( - () => - new Promise((resolve) => { - resolvers.push(resolve); - }) - ); - - // Fill queue - queue.execute(() => Promise.resolve()); - queue.execute(() => Promise.resolve()); - queue.execute(() => Promise.resolve()); - - expect(queue.getStats().queueLength).toBe(3); - - // This should throw (queue is full) - await expect(queue.execute(() => Promise.resolve())).rejects.toThrow( - QueueFullError - ); - - // Cleanup - for (const r of resolvers) { - r(); - } - }); - - it('should include queue size in QueueFullError', () => { - // Fill concurrent slots - queue.execute(() => new Promise(() => {})); - queue.execute(() => new Promise(() => {})); - - // Fill queue - queue.execute(() => Promise.resolve()); - queue.execute(() => Promise.resolve()); - queue.execute(() => Promise.resolve()); - - try { - queue.execute(() => Promise.resolve()); - } catch (error) { - expect(error).toBeInstanceOf(QueueFullError); - expect((error as QueueFullError).queueSize).toBe(3); - } - }); - }); - - describe('queue timeout', () => { - it('should timeout queued requests after queueTimeout', async () => { - const resolver: { resolve?: () => void } = {}; - - // Fill concurrent slots - queue.execute( - () => - new Promise((resolve) => { - resolver.resolve = resolve; - }) - ); - queue.execute(() => new Promise(() => {})); - - // Queue a request - const queuedPromise = queue.execute(() => Promise.resolve('result')); - - // Advance past timeout - vi.advanceTimersByTime(6_000); - - await expect(queuedPromise).rejects.toThrow(QueueTimeoutError); - }); - - it('should include wait time in QueueTimeoutError', async () => { - // Fill concurrent slots - queue.execute(() => new Promise(() => {})); - queue.execute(() => new Promise(() => {})); - - // Queue a request - const queuedPromise = queue.execute(() => Promise.resolve()); - - // Advance past timeout - vi.advanceTimersByTime(6_000); - - try { - await queuedPromise; - } catch (error) { - expect(error).toBeInstanceOf(QueueTimeoutError); - expect((error as QueueTimeoutError).waitTime).toBeGreaterThanOrEqual( - 5_000 - ); - } - }); - - it('should remove timed-out requests from queue', async () => { - // Fill concurrent slots - queue.execute(() => new Promise(() => {})); - queue.execute(() => new Promise(() => {})); - - // Queue requests with different timeouts by using a longer timeout queue - const longTimeoutQueue = new RequestQueue({ - maxConcurrent: 2, - maxQueueSize: 5, - queueTimeout: 10_000 // Longer timeout - }); - - // Fill concurrent slots - longTimeoutQueue.execute(() => new Promise(() => {})); - longTimeoutQueue.execute(() => new Promise(() => {})); - - // Queue two requests - const queued1 = longTimeoutQueue.execute(() => Promise.resolve(1)); - longTimeoutQueue.execute(() => Promise.resolve(2)); - - expect(longTimeoutQueue.getStats().queueLength).toBe(2); - - // Timeout all requests (advance past the 10s timeout) - vi.advanceTimersByTime(11_000); - - // Both should have timed out - await expect(queued1).rejects.toThrow(QueueTimeoutError); - - // Both requests should have been removed - expect(longTimeoutQueue.getStats().queueLength).toBe(0); - }); - }); - - describe('error handling', () => { - it('should propagate errors from executed functions', async () => { - const fn = vi.fn().mockRejectedValue(new Error('execution failed')); - - await expect(queue.execute(fn)).rejects.toThrow('execution failed'); - }); - - it('should decrement active count on error', async () => { - const fn = vi.fn().mockRejectedValue(new Error('fail')); - - try { - await queue.execute(fn); - } catch { - // Expected - } - - expect(queue.getStats().activeCount).toBe(0); - }); - - it('should process queue after error in active request', async () => { - let resolver: { resolve?: () => void } = {}; - - // Fill one slot with a request that will fail - const failing = queue.execute( - () => - new Promise((_, reject) => { - resolver = { resolve: () => reject(new Error('fail')) }; - }) - ); - - // Fill another slot - queue.execute(() => new Promise(() => {})); - - // Queue a request - const queued = queue.execute(() => Promise.resolve('queued result')); - - // Fail the first request - resolver.resolve?.(); - - try { - await failing; - } catch { - // Expected - } - - await vi.advanceTimersByTimeAsync(0); - - const result = await queued; - expect(result).toBe('queued result'); - }); - }); - - describe('clear', () => { - it('should clear all queued requests', async () => { - // Fill concurrent slots - queue.execute(() => new Promise(() => {})); - queue.execute(() => new Promise(() => {})); - - // Queue requests - const queued1 = queue.execute(() => Promise.resolve()); - const queued2 = queue.execute(() => Promise.resolve()); - - expect(queue.getStats().queueLength).toBe(2); - - queue.clear('Test clear'); - - expect(queue.getStats().queueLength).toBe(0); - - await expect(queued1).rejects.toThrow('Test clear'); - await expect(queued2).rejects.toThrow('Test clear'); - }); - - it('should not affect active requests when clearing', async () => { - const resolver: { resolve?: (value: string) => void } = {}; - - // Start an active request - const active = queue.execute( - () => - new Promise((resolve) => { - resolver.resolve = resolve; - }) - ); - - // Queue a request - queue.execute(() => Promise.resolve()); - - queue.clear(); - - // Active request should still complete - resolver.resolve?.('active result'); - - const result = await active; - expect(result).toBe('active result'); - }); - }); - - describe('hasCapacity', () => { - it('should return true when under limits', () => { - expect(queue.hasCapacity()).toBe(true); - }); - - it('should return true when at concurrency limit but queue has space', () => { - // Fill concurrent slots - queue.execute(() => new Promise(() => {})); - queue.execute(() => new Promise(() => {})); - - expect(queue.hasCapacity()).toBe(true); - }); - - it('should return false when queue is full', () => { - // Fill concurrent slots - queue.execute(() => new Promise(() => {})); - queue.execute(() => new Promise(() => {})); - - // Fill queue - queue.execute(() => Promise.resolve()); - queue.execute(() => Promise.resolve()); - queue.execute(() => Promise.resolve()); - - expect(queue.hasCapacity()).toBe(false); - }); - }); - - describe('default options', () => { - it('should use sensible defaults', () => { - const defaultQueue = new RequestQueue(); - const stats = defaultQueue.getStats(); - - expect(stats.maxConcurrent).toBe(10); - expect(stats.maxQueueSize).toBe(100); - }); - }); -}); From 876c23318439074426b88d883867847e23e0e350 Mon Sep 17 00:00:00 2001 From: katereznykova Date: Thu, 11 Dec 2025 18:01:08 +0000 Subject: [PATCH 6/7] use wait for in the test --- packages/sandbox-container/src/session.ts | 2 +- tests/e2e/comprehensive-workflow.test.ts | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/sandbox-container/src/session.ts b/packages/sandbox-container/src/session.ts index 5fba5cae..65f824b2 100644 --- a/packages/sandbox-container/src/session.ts +++ b/packages/sandbox-container/src/session.ts @@ -24,7 +24,7 @@ */ import { randomUUID } from 'node:crypto'; -import { createReadStream, watch } from 'node:fs'; +import { watch } from 'node:fs'; import { mkdir, open, rm, stat } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { basename, dirname, join } from 'node:path'; diff --git a/tests/e2e/comprehensive-workflow.test.ts b/tests/e2e/comprehensive-workflow.test.ts index 34728c7a..764f9ab3 100644 --- a/tests/e2e/comprehensive-workflow.test.ts +++ b/tests/e2e/comprehensive-workflow.test.ts @@ -298,8 +298,20 @@ const interval = setInterval(() => { expect(processData.id).toBeTruthy(); const processId = processData.id; - // Wait for process to complete - await new Promise((resolve) => setTimeout(resolve, 3000)); + // Wait for process to complete using waitForLog instead of fixed sleep + // This is more reliable under load as it waits for actual output + const waitResponse = await fetch( + `${workerUrl}/api/process/${processId}/waitForLog`, + { + method: 'POST', + headers, + body: JSON.stringify({ + pattern: 'Done', + timeout: 10000 + }) + } + ); + expect(waitResponse.status).toBe(200); // Get process logs const logsResponse = await fetch( From 8554f1cd7be90cfa19450c497f46585b4b4f1a72 Mon Sep 17 00:00:00 2001 From: whoiskatrin Date: Fri, 12 Dec 2025 09:44:25 +0000 Subject: [PATCH 7/7] Fix race condition for PID retrieval Updated the changeset to fix a race condition during PID retrieval. --- .changeset/clean-pans-switch.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/clean-pans-switch.md diff --git a/.changeset/clean-pans-switch.md b/.changeset/clean-pans-switch.md new file mode 100644 index 00000000..1437c7bc --- /dev/null +++ b/.changeset/clean-pans-switch.md @@ -0,0 +1,5 @@ +--- +"@cloudflare/sandbox": patch +--- + +fix race condition for PID retrieval