Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
861224d
agent steaming
tkattkat Nov 21, 2025
1fa6093
put behind experimental
tkattkat Nov 22, 2025
a0c8ddf
update typing
tkattkat Nov 22, 2025
8d4f29d
add streaming example
tkattkat Nov 22, 2025
e9c63f6
lint
tkattkat Nov 22, 2025
f0bfffe
update types
tkattkat Nov 22, 2025
b21cd59
change default max steps to 20
tkattkat Nov 22, 2025
c5c60f1
Merge remote-tracking branch 'origin/main' into agent-steaming
tkattkat Nov 25, 2025
57ec8c1
manually bump version
tkattkat Nov 25, 2025
e31d2df
Update packages/core/examples/agent_stream_example.ts
tkattkat Nov 25, 2025
ef9a78e
changeset
tkattkat Nov 25, 2025
5b5619c
change example to verbose 0
tkattkat Nov 25, 2025
e782674
add caching
tkattkat Nov 26, 2025
1b7d126
format
tkattkat Nov 26, 2025
51877dd
add wrapStreamForCaching
tkattkat Nov 26, 2025
ebd814f
consolidate shared logic
tkattkat Nov 26, 2025
0d06078
remove version change
tkattkat Nov 26, 2025
eeee0dd
agents streaming
tkattkat Nov 26, 2025
7f88860
update example
tkattkat Nov 26, 2025
ae01423
update changeset
tkattkat Nov 26, 2025
269d7fc
update example
tkattkat Nov 26, 2025
1e7af15
greptile suggestions
tkattkat Nov 26, 2025
4ad4efc
format
tkattkat Nov 26, 2025
05bdf3c
throw error when stream is used with cua
tkattkat Nov 26, 2025
3744a71
add test for streaming
tkattkat Nov 26, 2025
c14d736
update test
tkattkat Nov 26, 2025
c1e3bae
lint
tkattkat Nov 26, 2025
62cf2b0
make anthropic key available in e2e local CI step
seanmcguire12 Nov 26, 2025
008bceb
add try catch on prepare agent
tkattkat Nov 26, 2025
276cafb
format
tkattkat Nov 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/four-knives-ask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@browserbasehq/stagehand": patch
---

Add streaming support to agent through stream:true in the agent config
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 50
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
HEADLESS: true
steps:
- name: Check out repository code
Expand Down
49 changes: 49 additions & 0 deletions packages/core/examples/agent_stream_example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Stagehand } from "../lib/v3";
import dotenv from "dotenv";
import chalk from "chalk";

// Load environment variables
dotenv.config();
async function main() {
console.log(`\n${chalk.bold("Stagehand 🤘 Agent Streaming Example")}\n`);
// Initialize Stagehand
const stagehand = new Stagehand({
env: "LOCAL",
verbose: 0,
cacheDir: "stagehand-agent-cache",
logInferenceToFile: false,
experimental: true,
});

await stagehand.init();

try {
const page = stagehand.context.pages()[0];
await page.goto("https://amazon.com");

// Create a streaming agent with stream: true in the config
const agent = stagehand.agent({
model: "anthropic/claude-sonnet-4-5-20250929",
stream: true, // This makes execute() return AgentStreamResult
});

const agentRun = await agent.execute({
instruction: "go to amazon, and search for shampoo, stop after searching",
maxSteps: 20,
});
// stream the text
for await (const delta of agentRun.textStream) {
process.stdout.write(delta);
}
// stream everything ( toolcalls, messages, etc.)
// for await (const delta of result.fullStream) {
// console.log(delta);
// }

const finalResult = await agentRun.result;
console.log("Final Result:", finalResult);
} catch (error) {
console.log(`${chalk.red("✗")} Error: ${error}`);
}
}
main();
130 changes: 130 additions & 0 deletions packages/core/lib/v3/cache/AgentCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
import type {
AvailableModel,
AgentResult,
AgentStreamResult,
AgentConfig,
AgentExecuteOptions,
Logger,
Expand Down Expand Up @@ -185,6 +186,135 @@ export class AgentCache {
return await this.replayAgentCacheEntry(entry);
}

/**
* Attempts to replay a cached agent execution and returns it as a stream result.
*
* This method exists because the agent API exposes two execution modes:
* - `execute()` - Returns a Promise<AgentResult> directly
* - `stream()` - Returns an AgentStreamResult with async iterables for real-time output
*
* When a cache hit occurs, we need to return the appropriate type for each mode:
* - For `execute()`, we use `tryReplay()` which returns AgentResult
* - For `stream()`, we use `tryReplayAsStream()` which wraps the result in a
* stream-compatible interface
*
* This ensures consumers using `stream()` can still iterate over `textStream`
* and await `result` even when the response comes from cache, maintaining
* API consistency regardless of whether the result was cached or live.
*/
async tryReplayAsStream(
context: AgentCacheContext,
): Promise<AgentStreamResult | null> {
const result = await this.tryReplay(context);
if (!result) return null;
return this.createCachedStreamResult(result);
}

/**
* Creates a mock AgentStreamResult that wraps a cached AgentResult.
*
* AgentStreamResult (from the AI SDK) is a complex type with multiple async
* iterables and promises. When serving from cache, we don't have an actual
* LLM stream to consume - we just have the final result. This method creates
* a "fake" stream

* This approach lets cached responses be transparent to the consumer -
* they can use the same iteration patterns whether the result is live or cached.
*/
private createCachedStreamResult(
cachedResult: AgentResult,
): AgentStreamResult {
const message = cachedResult.message ?? "";

async function* textStreamGenerator(): AsyncGenerator<string> {
yield message;
}

async function* fullStreamGenerator(): AsyncGenerator<{
type: string;
textDelta?: string;
}> {
yield { type: "text-delta", textDelta: message };
yield { type: "finish" };
}

const mockStreamResult = {
textStream: textStreamGenerator(),
fullStream: fullStreamGenerator(),
result: Promise.resolve(cachedResult),
text: Promise.resolve(message),
usage: Promise.resolve({
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
}),
finishReason: Promise.resolve("stop" as const),
experimental_providerMetadata: Promise.resolve(undefined),
response: Promise.resolve({
id: "cached",
timestamp: new Date(),
modelId: "cached",
}),
rawResponse: Promise.resolve({ headers: {} }),
warnings: Promise.resolve([]),
steps: Promise.resolve([]),
toolCalls: Promise.resolve([]),
toolResults: Promise.resolve([]),
[Symbol.asyncIterator]: () => textStreamGenerator(),
} as unknown as AgentStreamResult;

return mockStreamResult;
}

/**
* Wraps an AgentStreamResult with caching logic.
*
* This method handles the complexity of caching for streaming responses:
* 1. Begins recording agent replay steps
* 2. Wraps the stream's result promise to capture completion
* 3. On success: ends recording and stores the cache entry
* 4. On error: discards the recording
*
* This keeps the caching orchestration in AgentCache rather than
* spreading it across the V3 class.
*
* @param context - The cache context for this execution
* @param streamResult - The stream result from the agent handler
* @param beginRecording - Callback to start recording (from V3)
* @param endRecording - Callback to end recording and get steps (from V3)
* @param discardRecording - Callback to discard recording on error (from V3)
* @returns The wrapped stream result with caching enabled
*/
wrapStreamForCaching(
context: AgentCacheContext,
streamResult: AgentStreamResult,
beginRecording: () => void,
endRecording: () => AgentReplayStep[],
discardRecording: () => void,
): AgentStreamResult {
beginRecording();

const originalResultPromise = streamResult.result;
const wrappedResultPromise = originalResultPromise.then(
async (result) => {
const agentSteps = endRecording();

if (result.success && agentSteps.length > 0) {
await this.store(context, agentSteps, result);
}

return result;
},
(error) => {
discardRecording();
throw error;
},
);

streamResult.result = wrappedResultPromise;
return streamResult;
}

async store(
context: AgentCacheContext,
steps: AgentReplayStep[],
Expand Down
Loading
Loading