Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/four-knives-ask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@browserbasehq/stagehand": patch
---

Add streaming support to agent through agent.stream
47 changes: 47 additions & 0 deletions packages/core/examples/agent_stream_example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Stagehand } from "../lib/v3";
import dotenv from "dotenv";
import chalk from "chalk";

// Load environment variables
dotenv.config();
async function main() {
console.log(`\n${chalk.bold("Stagehand 🤘 Agent Streaming Example")}\n`);
// Initialize Stagehand
const stagehand = new Stagehand({
env: "LOCAL",
verbose: 0,
cacheDir: "stagehand-agent-cache",
logInferenceToFile: false,
experimental: true,
});

await stagehand.init();

try {
const page = stagehand.context.pages()[0];
await page.goto("https://amazon.com");
const agent = stagehand.agent({
model: "anthropic/claude-sonnet-4-5-20250929",
executionModel: "google/gemini-2.5-flash",
});

const result = await agent.stream({
instruction: "go to amazon, and search for shampoo, stop after searching",
maxSteps: 20,
});
// stream the text
for await (const delta of result.textStream) {
process.stdout.write(delta);
}
// stream everything ( toolcalls, messages, etc.)
// for await (const delta of result.fullStream) {
// console.log(delta);
// }

const finalResult = await result.result;
console.log("Final Result:", finalResult);
} catch (error) {
console.log(`${chalk.red("✗")} Error: ${error}`);
}
}
main();
130 changes: 130 additions & 0 deletions packages/core/lib/v3/cache/AgentCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
import type {
AvailableModel,
AgentResult,
AgentStreamResult,
AgentConfig,
AgentExecuteOptions,
Logger,
Expand Down Expand Up @@ -185,6 +186,135 @@ export class AgentCache {
return await this.replayAgentCacheEntry(entry);
}

/**
* Attempts to replay a cached agent execution and returns it as a stream result.
*
* This method exists because the agent API exposes two execution modes:
* - `execute()` - Returns a Promise<AgentResult> directly
* - `stream()` - Returns an AgentStreamResult with async iterables for real-time output
*
* When a cache hit occurs, we need to return the appropriate type for each mode:
* - For `execute()`, we use `tryReplay()` which returns AgentResult
* - For `stream()`, we use `tryReplayAsStream()` which wraps the result in a
* stream-compatible interface
*
* This ensures consumers using `stream()` can still iterate over `textStream`
* and await `result` even when the response comes from cache, maintaining
* API consistency regardless of whether the result was cached or live.
*/
async tryReplayAsStream(
context: AgentCacheContext,
): Promise<AgentStreamResult | null> {
const result = await this.tryReplay(context);
if (!result) return null;
return this.createCachedStreamResult(result);
}

/**
* Creates a mock AgentStreamResult that wraps a cached AgentResult.
*
* AgentStreamResult (from the AI SDK) is a complex type with multiple async
* iterables and promises. When serving from cache, we don't have an actual
* LLM stream to consume - we just have the final result. This method creates
* a "fake" stream

* This approach lets cached responses be transparent to the consumer -
* they can use the same iteration patterns whether the result is live or cached.
*/
private createCachedStreamResult(
cachedResult: AgentResult,
): AgentStreamResult {
const message = cachedResult.message ?? "";

async function* textStreamGenerator(): AsyncGenerator<string> {
yield message;
}

async function* fullStreamGenerator(): AsyncGenerator<{
type: string;
textDelta?: string;
}> {
yield { type: "text-delta", textDelta: message };
yield { type: "finish" };
}

const mockStreamResult = {
textStream: textStreamGenerator(),
fullStream: fullStreamGenerator(),
result: Promise.resolve(cachedResult),
text: Promise.resolve(message),
usage: Promise.resolve({
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
}),
finishReason: Promise.resolve("stop" as const),
experimental_providerMetadata: Promise.resolve(undefined),
response: Promise.resolve({
id: "cached",
timestamp: new Date(),
modelId: "cached",
}),
rawResponse: Promise.resolve({ headers: {} }),
warnings: Promise.resolve([]),
steps: Promise.resolve([]),
toolCalls: Promise.resolve([]),
toolResults: Promise.resolve([]),
[Symbol.asyncIterator]: () => textStreamGenerator(),
} as unknown as AgentStreamResult;

return mockStreamResult;
}

/**
* Wraps an AgentStreamResult with caching logic.
*
* This method handles the complexity of caching for streaming responses:
* 1. Begins recording agent replay steps
* 2. Wraps the stream's result promise to capture completion
* 3. On success: ends recording and stores the cache entry
* 4. On error: discards the recording
*
* This keeps the caching orchestration in AgentCache rather than
* spreading it across the V3 class.
*
* @param context - The cache context for this execution
* @param streamResult - The stream result from the agent handler
* @param beginRecording - Callback to start recording (from V3)
* @param endRecording - Callback to end recording and get steps (from V3)
* @param discardRecording - Callback to discard recording on error (from V3)
* @returns The wrapped stream result with caching enabled
*/
wrapStreamForCaching(
context: AgentCacheContext,
streamResult: AgentStreamResult,
beginRecording: () => void,
endRecording: () => AgentReplayStep[],
discardRecording: () => void,
): AgentStreamResult {
beginRecording();

const originalResultPromise = streamResult.result;
const wrappedResultPromise = originalResultPromise.then(
async (result) => {
const agentSteps = endRecording();

if (result.success && agentSteps.length > 0) {
await this.store(context, agentSteps, result);
}

return result;
},
(error) => {
discardRecording();
throw error;
},
);

streamResult.result = wrappedResultPromise;
return streamResult;
}

async store(
context: AgentCacheContext,
steps: AgentReplayStep[],
Expand Down
Loading
Loading