diff --git a/pkgs/dsl/__tests__/types/dsl-types.test-d.ts b/pkgs/dsl/__tests__/types/dsl-types.test-d.ts index 748fd5026..1a03418f6 100644 --- a/pkgs/dsl/__tests__/types/dsl-types.test-d.ts +++ b/pkgs/dsl/__tests__/types/dsl-types.test-d.ts @@ -78,6 +78,24 @@ describe('Flow Type System Tests', () => { }); }); + describe('Empty dependsOn array handling', () => { + it('should reject empty dependsOn array at compile time', () => { + // Empty dependsOn: [] is semantically meaningless - if you have no dependencies, + // simply omit dependsOn. Allowing it creates a type mismatch where: + // - TypeScript infers deps as {} (empty object from never[]) + // - Runtime treats it as a root step and passes flowInput + // + // This test verifies that dependsOn: [] is rejected at compile time. + new Flow<{ userId: string }>({ slug: 'test_flow' }) + .step({ slug: 'root' }, () => ({ value: 1 })) + // @ts-expect-error - empty dependsOn array should be rejected + .step({ slug: 'bad_step', dependsOn: [] }, (deps) => { + // If this compiled, deps would be {} but runtime would pass { userId: string } + return { result: deps }; + }); + }); + }); + describe('Multi-level dependencies', () => { it('should correctly type multi-level dependencies', () => { new Flow({ slug: 'test_flow' }) diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index b96fc43a3..13d05d885 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -431,12 +431,13 @@ export class Flow< >; // Overload 2: Dependent step (with dependsOn) - receives deps, flowInput via context + // Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time step< Slug extends string, Deps extends Extract, TOutput >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: Deps[] } & StepRuntimeOptions>, + opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & StepRuntimeOptions>, handler: ( deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never }, context: FlowContext & TContext @@ -537,12 +538,13 @@ export class Flow< >; // Overload 2: Dependent array (with dependsOn) - receives deps, flowInput via context + // Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time array< Slug extends string, Deps extends Extract, TOutput extends readonly any[] >( - opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: Deps[] } & StepRuntimeOptions>, + opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & StepRuntimeOptions>, handler: ( deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never }, context: FlowContext & TContext diff --git a/pkgs/edge-worker/src/flow/StepTaskExecutor.ts b/pkgs/edge-worker/src/flow/StepTaskExecutor.ts index 9f77fffeb..4cbaa0dd8 100644 --- a/pkgs/edge-worker/src/flow/StepTaskExecutor.ts +++ b/pkgs/edge-worker/src/flow/StepTaskExecutor.ts @@ -111,8 +111,8 @@ export class StepTaskExecutor({ ) .step( { slug: 'saveToDb', dependsOn: ['summary', 'tags'] }, - async (deps, ctx) => - await saveWebsite({ - user_id: ctx.flowInput.user_id, - website_url: ctx.flowInput.url, + async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return saveWebsite({ + user_id: flowInput.user_id, + website_url: flowInput.url, summary: deps.summary, tags: deps.tags, - }) + }); + } ); diff --git a/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx b/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx index 5829e9cd8..a7041c51c 100644 --- a/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx +++ b/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx @@ -71,22 +71,22 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({ ## Enriching Array Elements with Additional Data -Map handlers only receive individual array elements. If a handler needs access to the original flow input, outputs from other dependencies, or any additional data, include that data in the array elements via a previous step. +Map handlers can access flow input via `await ctx.flowInput`, but for better performance, include needed data directly in the array elements via a previous step. -### Problem: Map Handlers Can't Access Flow Context +### Inefficient: Awaiting flowInput in Each Map Task -```typescript "apiKey: string" del="id, ???" del="(id)" -// This won't work - map handler can't access ctx.flowInput -const ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({ - slug: 'problemFlow', +```typescript "apiKey: string" del="await ctx.flowInput" del="(id, ctx)" +// Works, but inefficient - fetches flowInput for each of N tasks +const InefficientFlow = new Flow<{ apiKey: string, ids: string[] }>({ + slug: 'inefficientFlow', }) - .map({ slug: 'fetch' }, async (id) => { - // Can't access flowInput.apiKey here! - return await fetchWithKey(id, ???); // No access to apiKey + .map({ slug: 'fetch' }, async (id, ctx) => { + const flowInput = await ctx.flowInput; // N fetches for N items + return await fetchWithKey(id, flowInput.apiKey); }); ``` -### Solution: Enrich Array Elements +### Better: Enrich Array Elements ```typescript ins={4-13} ins="item.id, item.apiKey" ins="(item)" const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({ @@ -113,6 +113,10 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({ This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input, outputs from other dependencies, or both. +:::tip[Why is ctx.flowInput async?] +`ctx.flowInput` is lazy-loaded to prevent data duplication. For map steps processing thousands of items, including the full flow input in each task record would multiply storage and transfer costs. Instead, it's fetched on-demand when you `await` it and cached per run - subsequent awaits in the same worker reuse the cached value. The enrichment pattern above is still more efficient because it avoids even the initial fetch overhead per task. +::: + :::note[Debugging Map Tasks] When debugging map steps, use [`context.stepTask.task_index`](/reference/context/#steptask) to identify which array element each task is processing. ::: diff --git a/pkgs/website/src/content/docs/concepts/map-steps.mdx b/pkgs/website/src/content/docs/concepts/map-steps.mdx index 6a78fd101..6e116d185 100644 --- a/pkgs/website/src/content/docs/concepts/map-steps.mdx +++ b/pkgs/website/src/content/docs/concepts/map-steps.mdx @@ -119,7 +119,7 @@ Map step handlers receive only the individual array element, not the full input ```typescript "deps" .step({ slug: 'regular', dependsOn: ['source'] }, async (deps, ctx) => { // deps contains outputs from dependencies - // ctx.flowInput provides access to the original flow input + // await ctx.flowInput provides access to the original flow input return processAllData(deps.source); }) ``` @@ -128,13 +128,17 @@ Map step handlers receive only the individual array element, not the full input ```typescript "item" .map({ slug: 'mapStep', array: 'source' }, async (item, ctx) => { // item is ONLY the individual array element - // ctx.flowInput provides access to the original flow input + // await ctx.flowInput provides access to the original flow input return processItem(item); }) ``` This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data from other step outputs, include that data in the array elements via a previous step. The context parameter provides access to flow input when needed. +:::tip[Accessing flowInput in map handlers] +While `await ctx.flowInput` works in map handlers, it's lazy-loaded (fetched on first access, then cached per run). For better performance with large arrays, enrich your array elements with the data they need before mapping. See [Enriching Array Elements](/build/process-arrays-in-parallel/#enriching-array-elements-with-additional-data) for the recommended pattern. +::: + ## Root Maps vs Dependent Maps Map steps operate in two modes: diff --git a/pkgs/website/src/content/docs/concepts/understanding-flows.mdx b/pkgs/website/src/content/docs/concepts/understanding-flows.mdx index f9c0f6f0a..e75f13b40 100644 --- a/pkgs/website/src/content/docs/concepts/understanding-flows.mdx +++ b/pkgs/website/src/content/docs/concepts/understanding-flows.mdx @@ -41,14 +41,14 @@ Both step types receive a **context object** as the second parameter, which prov :::note[Accessing Flow Input] - Root steps: Access directly via the first parameter (e.g., `flowInput.url`) -- Dependent steps: Access via context (e.g., `ctx.flowInput.url`) +- Dependent steps: Access via context (e.g., `await ctx.flowInput` - it's lazy-loaded) This asymmetric design matches the domain - root and dependent steps ARE different. ::: Consider this example: -```typescript "flowInput.url" "ctx.flowInput.userId" "deps.scrape.content" +```typescript "flowInput.url" "await ctx.flowInput" "deps.scrape.content" new Flow<{ url: string, userId: string }>({ slug: 'analyzeWebsite', }) @@ -63,7 +63,7 @@ new Flow<{ url: string, userId: string }>({ { slug: 'analyze', dependsOn: ['scrape'] }, async (deps, ctx) => { // deps contains dependency outputs - // ctx.flowInput provides access to original flow input if needed + // await ctx.flowInput provides access to original flow input if needed return await analyzeContent(deps.scrape.content); } ); @@ -72,7 +72,7 @@ new Flow<{ url: string, userId: string }>({ When this flow runs: 1. The flow receives an input object (e.g., `{ url: "example.com", userId: "123" }`) 2. Root steps receive the flow input directly as the first parameter -3. Dependent steps receive their dependencies as the first parameter, with flow input available via `ctx.flowInput` +3. Dependent steps receive their dependencies as the first parameter, with flow input available via `await ctx.flowInput` ## The Type System @@ -272,7 +272,7 @@ The TypeScript code is used only for definition and compilation, not for executi Here's a practical example showing how data flows efficiently through steps: -```typescript "flowInput.userId" "ctx.flowInput.reportType" "ctx.flowInput.includeDetails" {21,30,38} +```typescript "flowInput.userId" "await ctx.flowInput" {22-23,32-33,42-43} // Flow with multiple input parameters type Input = { userId: string, @@ -294,27 +294,28 @@ new Flow({ .step( { slug: 'activity', dependsOn: ['user'] }, async (deps, ctx) => { - // Uses ctx.flowInput.reportType to determine timespan - const timespan = ctx.flowInput.reportType === 'advanced' ? '1y' : '30d'; + const flowInput = await ctx.flowInput; + const timespan = flowInput.reportType === 'advanced' ? '1y' : '30d'; return await getUserActivity(deps.user.id, timespan); } ) .step( { slug: 'preferences', dependsOn: ['user'] }, async (deps, ctx) => { - // Uses ctx.flowInput.includeDetails parameter - return await getUserPreferences(deps.user.id, ctx.flowInput.includeDetails); + const flowInput = await ctx.flowInput; + return await getUserPreferences(deps.user.id, flowInput.includeDetails); } ) // Step 4: Combine results .step( { slug: 'report', dependsOn: ['activity', 'preferences'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; return { user: deps.user, activity: deps.activity, preferences: deps.preferences, - reportType: ctx.flowInput.reportType, // Original parameter still available + reportType: flowInput.reportType, generatedAt: new Date().toISOString() }; } @@ -322,7 +323,7 @@ new Flow({ ``` This example demonstrates: -1. **Original parameters available via context** - Dependent steps access flow input via `ctx.flowInput` +1. **Original parameters available via context** - Dependent steps access flow input via `await ctx.flowInput` 2. **Conditional processing** - Steps adapt behavior based on original parameters 3. **No manual parameter forwarding needed** - The `user` step doesn't need to include original parameters 4. **Type safety throughout** - TypeScript ensures all data accesses are valid diff --git a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx index bc609dbd6..33a4ab228 100644 --- a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx +++ b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx @@ -62,7 +62,7 @@ type Input = { }; ``` -This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `ctx.flowInput`. +This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `await ctx.flowInput`. ### Flow constructor @@ -113,7 +113,7 @@ export const GreetUser = new Flow({ (deps) => ... // deps = { fullName: "John Doe" } ``` -Need flow input in a dependent step? Use `ctx.flowInput`. +Need flow input in a dependent step? Use `await ctx.flowInput`. ## Exporting flows @@ -145,7 +145,7 @@ Recompiling **deletes the existing flow and all its run data**. See [Startup Com :::note[Key Concepts] - **Root steps**: Receive flow input directly as first parameter -- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `ctx.flowInput` +- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `await ctx.flowInput` - **step slugs**: Unique identifiers for each step - **dependsOn**: Specifies which steps must complete before a step can run - **JSON serialization**: All inputs and outputs must be JSON-serializable diff --git a/pkgs/website/src/content/docs/index.mdx b/pkgs/website/src/content/docs/index.mdx index a87bb79c9..a96077656 100644 --- a/pkgs/website/src/content/docs/index.mdx +++ b/pkgs/website/src/content/docs/index.mdx @@ -334,12 +334,15 @@ new Flow<{ url: string }>({ slug: 'analyzeArticle' }) (deps) => extractKeywords(deps.fetchArticle) ) .step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, - (deps, ctx) => publishArticle({ - url: ctx.flowInput.url, - content: deps.fetchArticle, - summary: deps.summarize, - keywords: deps.extractKeywords - }) + async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return publishArticle({ + url: flowInput.url, + content: deps.fetchArticle, + summary: deps.summarize, + keywords: deps.extractKeywords + }); + } ); ``` diff --git a/pkgs/website/src/content/docs/reference/compile-api.mdx b/pkgs/website/src/content/docs/reference/compile-api.mdx index d572b4cc7..8dccb28bc 100644 --- a/pkgs/website/src/content/docs/reference/compile-api.mdx +++ b/pkgs/website/src/content/docs/reference/compile-api.mdx @@ -30,10 +30,10 @@ const MyFlow = new Flow({ timeout: 60, }) .step({ slug: 'fetch' }, (flowInput) => flowInput.url) - .step({ slug: 'process', dependsOn: ['fetch'] }, (deps, ctx) => ({ - url: ctx.flowInput.url, - content: deps.fetch, - })); + .step({ slug: 'process', dependsOn: ['fetch'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return { url: flowInput.url, content: deps.fetch }; + }); const statements = compileFlow(MyFlow); // Returns: string[] diff --git a/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx b/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx index 687138b8b..e56ca3df6 100644 --- a/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx +++ b/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx @@ -276,8 +276,10 @@ export const AnalyzeWebsite = new Flow({ slug: "analyzeWebsite", maxAttem ) .step( { slug: "saveToDb", dependsOn: ["summary", "tags"] }, - (deps, ctx) => - saveToDb({ website_url: ctx.flowInput.url, summary: deps.summary, tags: deps.tags }), + async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return saveToDb({ website_url: flowInput.url, summary: deps.summary, tags: deps.tags }); + }, ); ``` diff --git a/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx b/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx index f92b33418..03d7150db 100644 --- a/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx +++ b/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx @@ -223,13 +223,14 @@ export const GenerateEmbeddings = new Flow({ slug: 'generateEmbeddings' } .map({ slug: 'embeddings', array: 'chunks' }, (chunk) => generateEmbedding(chunk) ) - .step({ slug: 'save', dependsOn: ['chunks', 'embeddings'] }, (deps, ctx) => - saveChunks({ - documentId: ctx.flowInput.documentId, + .step({ slug: 'save', dependsOn: ['chunks', 'embeddings'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return saveChunks({ + documentId: flowInput.documentId, chunks: deps.chunks, embeddings: deps.embeddings, - }, ctx.supabase) - ); + }, ctx.supabase); + }); ``` **How the flow works:** diff --git a/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx b/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx index 4d0b5b1ad..60338a5c0 100644 --- a/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx +++ b/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx @@ -221,19 +221,22 @@ export default new Flow({ slug: 'chatbot' }) ) .step( { slug: 'response', dependsOn: ['history', 'knowledge'] }, - (deps, ctx) => - generateResponse({ - message: ctx.flowInput.message, + async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return generateResponse({ + message: flowInput.message, history: deps.history, knowledge: deps.knowledge, - }) + }); + } ) - .step({ slug: 'save', dependsOn: ['response'] }, (deps, ctx) => - saveMessage({ - conversationId: ctx.flowInput.conversationId, + .step({ slug: 'save', dependsOn: ['response'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return saveMessage({ + conversationId: flowInput.conversationId, content: deps.response, - }) - ); + }); + }); ``` The `history` and `knowledge` steps run in parallel since neither depends on the other. Both results feed into `response`, which generates an AI reply enriched with conversation context and relevant knowledge. diff --git a/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx b/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx index a68ec42f3..e8ec15467 100644 --- a/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx +++ b/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx @@ -210,18 +210,20 @@ export default new Flow({ slug: 'ragSearch' }) limit: 20, }) ) - .step({ slug: 'rerank', dependsOn: ['retrieve', 'transform'] }, (deps, ctx) => - rerankResults({ - query: ctx.flowInput.query, + .step({ slug: 'rerank', dependsOn: ['retrieve', 'transform'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return rerankResults({ + query: flowInput.query, documents: deps.retrieve, - }) - ) - .step({ slug: 'answer', dependsOn: ['rerank'] }, (deps, ctx) => - generateAnswer({ - query: ctx.flowInput.query, + }); + }) + .step({ slug: 'answer', dependsOn: ['rerank'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return generateAnswer({ + query: flowInput.query, context: deps.rerank, - }) - ); + }); + }); ``` ## Compile and Deploy