Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkgs/dsl/__tests__/types/dsl-types.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ describe('Flow Type System Tests', () => {
});
});

describe('Empty dependsOn array handling', () => {
it('should reject empty dependsOn array at compile time', () => {
// Empty dependsOn: [] is semantically meaningless - if you have no dependencies,
// simply omit dependsOn. Allowing it creates a type mismatch where:
// - TypeScript infers deps as {} (empty object from never[])
// - Runtime treats it as a root step and passes flowInput
//
// This test verifies that dependsOn: [] is rejected at compile time.
new Flow<{ userId: string }>({ slug: 'test_flow' })
.step({ slug: 'root' }, () => ({ value: 1 }))
// @ts-expect-error - empty dependsOn array should be rejected
.step({ slug: 'bad_step', dependsOn: [] }, (deps) => {
// If this compiled, deps would be {} but runtime would pass { userId: string }
return { result: deps };
});
});
});

describe('Multi-level dependencies', () => {
it('should correctly type multi-level dependencies', () => {
new Flow<string>({ slug: 'test_flow' })
Expand Down
6 changes: 4 additions & 2 deletions pkgs/dsl/src/dsl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,13 @@ export class Flow<
>;

// Overload 2: Dependent step (with dependsOn) - receives deps, flowInput via context
// Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time
step<
Slug extends string,
Deps extends Extract<keyof Steps, string>,
TOutput
>(
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: Deps[] } & StepRuntimeOptions>,
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & StepRuntimeOptions>,
handler: (
deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never },
context: FlowContext<TEnv, TFlowInput> & TContext
Expand Down Expand Up @@ -537,12 +538,13 @@ export class Flow<
>;

// Overload 2: Dependent array (with dependsOn) - receives deps, flowInput via context
// Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time
array<
Slug extends string,
Deps extends Extract<keyof Steps, string>,
TOutput extends readonly any[]
>(
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: Deps[] } & StepRuntimeOptions>,
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & StepRuntimeOptions>,
handler: (
deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never },
context: FlowContext<TEnv, TFlowInput> & TContext
Expand Down
4 changes: 2 additions & 2 deletions pkgs/edge-worker/src/flow/StepTaskExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
// Map steps: SQL already extracted element
handlerInput = this.stepTask.input;
} else if (stepDef.dependencies.length === 0) {
// Root single/array step: use flowInput from context (await the Promise)
handlerInput = await this.context.flowInput;
// Root single/array step: flow_input is guaranteed by SQL (start_tasks)
handlerInput = this.stepTask.flow_input!;
} else {
// Dependent single/array step: use deps object from task input
handlerInput = this.stepTask.input;
Expand Down
12 changes: 7 additions & 5 deletions pkgs/website/src/code-examples/analyze_website_simplified.ts.raw
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ new Flow<{ url: string; user_id: string }>({
)
.step(
{ slug: 'saveToDb', dependsOn: ['summary', 'tags'] },
async (deps, ctx) =>
await saveWebsite({
user_id: ctx.flowInput.user_id,
website_url: ctx.flowInput.url,
async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return saveWebsite({
user_id: flowInput.user_id,
website_url: flowInput.url,
summary: deps.summary,
tags: deps.tags,
})
});
}
);
24 changes: 14 additions & 10 deletions pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({

## Enriching Array Elements with Additional Data

Map handlers only receive individual array elements. If a handler needs access to the original flow input, outputs from other dependencies, or any additional data, include that data in the array elements via a previous step.
Map handlers can access flow input via `await ctx.flowInput`, but for better performance, include needed data directly in the array elements via a previous step.

### Problem: Map Handlers Can't Access Flow Context
### Inefficient: Awaiting flowInput in Each Map Task

```typescript "apiKey: string" del="id, ???" del="(id)"
// This won't work - map handler can't access ctx.flowInput
const ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({
slug: 'problemFlow',
```typescript "apiKey: string" del="await ctx.flowInput" del="(id, ctx)"
// Works, but inefficient - fetches flowInput for each of N tasks
const InefficientFlow = new Flow<{ apiKey: string, ids: string[] }>({
slug: 'inefficientFlow',
})
.map({ slug: 'fetch' }, async (id) => {
// Can't access flowInput.apiKey here!
return await fetchWithKey(id, ???); // No access to apiKey
.map({ slug: 'fetch' }, async (id, ctx) => {
const flowInput = await ctx.flowInput; // N fetches for N items
return await fetchWithKey(id, flowInput.apiKey);
});
```

### Solution: Enrich Array Elements
### Better: Enrich Array Elements

```typescript ins={4-13} ins="item.id, item.apiKey" ins="(item)"
const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({
Expand All @@ -113,6 +113,10 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({

This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input, outputs from other dependencies, or both.

:::tip[Why is ctx.flowInput async?]
`ctx.flowInput` is lazy-loaded to prevent data duplication. For map steps processing thousands of items, including the full flow input in each task record would multiply storage and transfer costs. Instead, it's fetched on-demand when you `await` it and cached per run - subsequent awaits in the same worker reuse the cached value. The enrichment pattern above is still more efficient because it avoids even the initial fetch overhead per task.
:::

:::note[Debugging Map Tasks]
When debugging map steps, use [`context.stepTask.task_index`](/reference/context/#steptask) to identify which array element each task is processing.
:::
Expand Down
8 changes: 6 additions & 2 deletions pkgs/website/src/content/docs/concepts/map-steps.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Map step handlers receive only the individual array element, not the full input
```typescript "deps"
.step({ slug: 'regular', dependsOn: ['source'] }, async (deps, ctx) => {
// deps contains outputs from dependencies
// ctx.flowInput provides access to the original flow input
// await ctx.flowInput provides access to the original flow input
return processAllData(deps.source);
})
```
Expand All @@ -128,13 +128,17 @@ Map step handlers receive only the individual array element, not the full input
```typescript "item"
.map({ slug: 'mapStep', array: 'source' }, async (item, ctx) => {
// item is ONLY the individual array element
// ctx.flowInput provides access to the original flow input
// await ctx.flowInput provides access to the original flow input
return processItem(item);
})
```

This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data from other step outputs, include that data in the array elements via a previous step. The context parameter provides access to flow input when needed.

:::tip[Accessing flowInput in map handlers]
While `await ctx.flowInput` works in map handlers, it's lazy-loaded (fetched on first access, then cached per run). For better performance with large arrays, enrich your array elements with the data they need before mapping. See [Enriching Array Elements](/build/process-arrays-in-parallel/#enriching-array-elements-with-additional-data) for the recommended pattern.
:::

## Root Maps vs Dependent Maps

Map steps operate in two modes:
Expand Down
23 changes: 12 additions & 11 deletions pkgs/website/src/content/docs/concepts/understanding-flows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ Both step types receive a **context object** as the second parameter, which prov

:::note[Accessing Flow Input]
- Root steps: Access directly via the first parameter (e.g., `flowInput.url`)
- Dependent steps: Access via context (e.g., `ctx.flowInput.url`)
- Dependent steps: Access via context (e.g., `await ctx.flowInput` - it's lazy-loaded)

This asymmetric design matches the domain - root and dependent steps ARE different.
:::

Consider this example:

```typescript "flowInput.url" "ctx.flowInput.userId" "deps.scrape.content"
```typescript "flowInput.url" "await ctx.flowInput" "deps.scrape.content"
new Flow<{ url: string, userId: string }>({
slug: 'analyzeWebsite',
})
Expand All @@ -63,7 +63,7 @@ new Flow<{ url: string, userId: string }>({
{ slug: 'analyze', dependsOn: ['scrape'] },
async (deps, ctx) => {
// deps contains dependency outputs
// ctx.flowInput provides access to original flow input if needed
// await ctx.flowInput provides access to original flow input if needed
return await analyzeContent(deps.scrape.content);
}
);
Expand All @@ -72,7 +72,7 @@ new Flow<{ url: string, userId: string }>({
When this flow runs:
1. The flow receives an input object (e.g., `{ url: "example.com", userId: "123" }`)
2. Root steps receive the flow input directly as the first parameter
3. Dependent steps receive their dependencies as the first parameter, with flow input available via `ctx.flowInput`
3. Dependent steps receive their dependencies as the first parameter, with flow input available via `await ctx.flowInput`

## The Type System

Expand Down Expand Up @@ -272,7 +272,7 @@ The TypeScript code is used only for definition and compilation, not for executi

Here's a practical example showing how data flows efficiently through steps:

```typescript "flowInput.userId" "ctx.flowInput.reportType" "ctx.flowInput.includeDetails" {21,30,38}
```typescript "flowInput.userId" "await ctx.flowInput" {22-23,32-33,42-43}
// Flow with multiple input parameters
type Input = {
userId: string,
Expand All @@ -294,35 +294,36 @@ new Flow<Input>({
.step(
{ slug: 'activity', dependsOn: ['user'] },
async (deps, ctx) => {
// Uses ctx.flowInput.reportType to determine timespan
const timespan = ctx.flowInput.reportType === 'advanced' ? '1y' : '30d';
const flowInput = await ctx.flowInput;
const timespan = flowInput.reportType === 'advanced' ? '1y' : '30d';
return await getUserActivity(deps.user.id, timespan);
}
)
.step(
{ slug: 'preferences', dependsOn: ['user'] },
async (deps, ctx) => {
// Uses ctx.flowInput.includeDetails parameter
return await getUserPreferences(deps.user.id, ctx.flowInput.includeDetails);
const flowInput = await ctx.flowInput;
return await getUserPreferences(deps.user.id, flowInput.includeDetails);
}
)
// Step 4: Combine results
.step(
{ slug: 'report', dependsOn: ['activity', 'preferences'] },
async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return {
user: deps.user,
activity: deps.activity,
preferences: deps.preferences,
reportType: ctx.flowInput.reportType, // Original parameter still available
reportType: flowInput.reportType,
generatedAt: new Date().toISOString()
};
}
);
```

This example demonstrates:
1. **Original parameters available via context** - Dependent steps access flow input via `ctx.flowInput`
1. **Original parameters available via context** - Dependent steps access flow input via `await ctx.flowInput`
2. **Conditional processing** - Steps adapt behavior based on original parameters
3. **No manual parameter forwarding needed** - The `user` step doesn't need to include original parameters
4. **Type safety throughout** - TypeScript ensures all data accesses are valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Input = {
};
```

This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `ctx.flowInput`.
This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `await ctx.flowInput`.

### Flow constructor

Expand Down Expand Up @@ -113,7 +113,7 @@ export const GreetUser = new Flow<Input>({
(deps) => ... // deps = { fullName: "John Doe" }
```

Need flow input in a dependent step? Use `ctx.flowInput`.
Need flow input in a dependent step? Use `await ctx.flowInput`.
</Aside>

## Exporting flows
Expand Down Expand Up @@ -145,7 +145,7 @@ Recompiling **deletes the existing flow and all its run data**. See [Startup Com

:::note[Key Concepts]
- **Root steps**: Receive flow input directly as first parameter
- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `ctx.flowInput`
- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `await ctx.flowInput`
- **step slugs**: Unique identifiers for each step
- **dependsOn**: Specifies which steps must complete before a step can run
- **JSON serialization**: All inputs and outputs must be JSON-serializable
Expand Down
15 changes: 9 additions & 6 deletions pkgs/website/src/content/docs/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,15 @@ new Flow<{ url: string }>({ slug: 'analyzeArticle' })
(deps) => extractKeywords(deps.fetchArticle)
)
.step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] },
(deps, ctx) => publishArticle({
url: ctx.flowInput.url,
content: deps.fetchArticle,
summary: deps.summarize,
keywords: deps.extractKeywords
})
async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return publishArticle({
url: flowInput.url,
content: deps.fetchArticle,
summary: deps.summarize,
keywords: deps.extractKeywords
});
}
);
```

Expand Down
8 changes: 4 additions & 4 deletions pkgs/website/src/content/docs/reference/compile-api.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ const MyFlow = new Flow<Input>({
timeout: 60,
})
.step({ slug: 'fetch' }, (flowInput) => flowInput.url)
.step({ slug: 'process', dependsOn: ['fetch'] }, (deps, ctx) => ({
url: ctx.flowInput.url,
content: deps.fetch,
}));
.step({ slug: 'process', dependsOn: ['fetch'] }, async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return { url: flowInput.url, content: deps.fetch };
});

const statements = compileFlow(MyFlow);
// Returns: string[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,10 @@ export const AnalyzeWebsite = new Flow<Input>({ slug: "analyzeWebsite", maxAttem
)
.step(
{ slug: "saveToDb", dependsOn: ["summary", "tags"] },
(deps, ctx) =>
saveToDb({ website_url: ctx.flowInput.url, summary: deps.summary, tags: deps.tags }),
async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return saveToDb({ website_url: flowInput.url, summary: deps.summary, tags: deps.tags });
},
);
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,14 @@ export const GenerateEmbeddings = new Flow<Input>({ slug: 'generateEmbeddings' }
.map({ slug: 'embeddings', array: 'chunks' }, (chunk) =>
generateEmbedding(chunk)
)
.step({ slug: 'save', dependsOn: ['chunks', 'embeddings'] }, (deps, ctx) =>
saveChunks({
documentId: ctx.flowInput.documentId,
.step({ slug: 'save', dependsOn: ['chunks', 'embeddings'] }, async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return saveChunks({
documentId: flowInput.documentId,
chunks: deps.chunks,
embeddings: deps.embeddings,
}, ctx.supabase)
);
}, ctx.supabase);
});
```

**How the flow works:**
Expand Down
21 changes: 12 additions & 9 deletions pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,22 @@ export default new Flow<Input>({ slug: 'chatbot' })
)
.step(
{ slug: 'response', dependsOn: ['history', 'knowledge'] },
(deps, ctx) =>
generateResponse({
message: ctx.flowInput.message,
async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return generateResponse({
message: flowInput.message,
history: deps.history,
knowledge: deps.knowledge,
})
});
}
)
.step({ slug: 'save', dependsOn: ['response'] }, (deps, ctx) =>
saveMessage({
conversationId: ctx.flowInput.conversationId,
.step({ slug: 'save', dependsOn: ['response'] }, async (deps, ctx) => {
const flowInput = await ctx.flowInput;
return saveMessage({
conversationId: flowInput.conversationId,
content: deps.response,
})
);
});
});
```

The `history` and `knowledge` steps run in parallel since neither depends on the other. Both results feed into `response`, which generates an AI reply enriched with conversation context and relevant knowledge.
Expand Down
Loading