Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/asymmetric-handler-signatures.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
'@pgflow/core': minor
'@pgflow/dsl': minor
'@pgflow/client': minor
'@pgflow/edge-worker': minor
'pgflow': minor
---

BREAKING: Asymmetric handler signatures - remove `run` key from step inputs

- Root steps: `(flowInput, ctx) => ...` - flow input directly as first param
- Dependent steps: `(deps, ctx) => ...` - only dependency outputs as first param
- Access flow input in dependent steps via `ctx.flowInput`
- Enables functional composition and simplifies types for future subflows
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
---
draft: true
title: 'pgflow 0.12.0: Simpler Handler Signatures for Flow Composition'
description: 'Breaking change: asymmetric handler signatures remove the run wrapper, enabling functional composition'
date: 2025-12-24
authors:
- jumski
tags:
- release
- breaking-change
- migration
featured: true
---

import { Aside, Steps } from "@astrojs/starlight/components";

pgflow 0.12.0 introduces asymmetric handler signatures - a breaking change that removes the `run` wrapper from step inputs, enabling cleaner functional composition and preparing the foundation for subflows.

## What Changed

Handler signatures are now asymmetric based on step type:

| Step Type | Before | After |
|-----------|--------|-------|
| Root step | `(input) => input.run.xxx` | `(flowInput, ctx) => flowInput.xxx` |
| Dependent step | `(input) => input.dep.xxx` | `(deps, ctx) => deps.dep.xxx` |
| Dependent (needs flowInput) | `(input) => input.run.xxx` | `(deps, ctx) => ctx.flowInput.xxx` |
| Map step | `(item) => ...` | `(item, ctx) => ctx.flowInput.xxx` |

<Aside type="caution" title="Breaking Change">
All handler signatures must be updated. The `input.run` pattern no longer exists.
</Aside>

## Why This Change?

The previous `run` wrapper blocked functional composition:

```typescript
// OLD: The run wrapper created type mismatches
// Root steps received: { run: flowInput }
// Dependent steps received: { run: flowInput, dep1: output1 }

// This meant subflows couldn't compose cleanly:
const ChildFlow = new Flow<{ data: string }>()
.step({ slug: "process" }, (input) => {
// Expected: input.data
// Received: { run: parentInput, prep: { data: "..." } }
// TYPE MISMATCH!
});
```

By removing the wrapper, outputs from one flow can now become inputs to another without transformation.

## New Handler Signatures

### Root Steps

Root steps receive flow input directly as the first parameter:

```typescript
// .step() - root
.step({ slug: 'init' }, (flowInput, ctx) => {
console.log(ctx.env.API_KEY); // Context still available
return { userId: flowInput.userId };
})

// .array() - root (returns array for parallel processing)
.array({ slug: 'fetchAll' }, (flowInput, ctx) => {
return flowInput.urls; // Returns array directly
})
```

### Dependent Steps

Dependent steps receive only their dependencies as the first parameter. Access `flowInput` via context if needed:

```typescript
// .step() - dependent (needs flowInput)
.step({ slug: 'process', dependsOn: ['init'] }, (deps, ctx) => {
const config = ctx.flowInput.config; // Via context
const data = deps.init.userId; // Dependencies directly
return { result: process(data, config) };
})

// .step() - dependent (doesn't need flowInput - most common)
.step({ slug: 'save', dependsOn: ['process'] }, (deps) => {
return deps.process.result; // Just use deps
})

// .array() - dependent
.array({ slug: 'splitResults', dependsOn: ['process'] }, (deps, ctx) => {
return deps.process.items; // Returns array for parallel processing
})
```

### Map Steps

Map steps receive individual array elements. Access `flowInput` via context:

```typescript
// .map() - processes each array element
.map({ slug: 'processItem', array: 'fetchAll' }, (item, ctx) => {
const config = ctx.flowInput.config; // Original flow input via context
return transform(item, config);
})
```

## Upgrading Your Flows

### Migration Patterns

Apply these transformations to your handlers:

#### Root Steps (.step, .array)

```typescript del="input" del="input.run" ins="flowInput"
// BEFORE
.step({ slug: 'init' }, (input) => {
return { userId: input.run.userId };
})

// AFTER
.step({ slug: 'init' }, (flowInput) => {
return { userId: flowInput.userId };
})
```

```typescript del="input" del="input.run" ins="flowInput"
// BEFORE
.array({ slug: 'getUrls' }, (input) => {
return input.run.urls;
})

// AFTER
.array({ slug: 'getUrls' }, (flowInput) => {
return flowInput.urls;
})
```

#### Dependent Steps - Needing flowInput

```typescript del="input" del="input.run" del="input.init" ins="deps, ctx" ins="ctx.flowInput" ins="deps.init"
// BEFORE
.step({ slug: 'process', dependsOn: ['init'] }, (input) => {
const config = input.run.config;
const data = input.init.data;
return combine(data, config);
})

// AFTER
.step({ slug: 'process', dependsOn: ['init'] }, (deps, ctx) => {
const config = ctx.flowInput.config;
const data = deps.init.data;
return combine(data, config);
})
```

#### Dependent Steps - Not Needing flowInput (Common Case)

```typescript del="input" del="input.process" ins="deps" ins="deps.process"
// BEFORE
.step({ slug: 'save', dependsOn: ['process'] }, (input) => {
return saveToDb(input.process.result);
})

// AFTER
.step({ slug: 'save', dependsOn: ['process'] }, (deps) => {
return saveToDb(deps.process.result);
})
```

#### Map Steps - Accessing flowInput

```typescript ins="item, ctx" ins="ctx.flowInput.options"
// BEFORE
.map({ slug: 'transform', array: 'items' }, (item) => {
return process(item);
})

// AFTER (if you need flowInput)
.map({ slug: 'transform', array: 'items' }, (item, ctx) => {
return process(item, ctx.flowInput.options);
})
```

### Production Upgrade Guide

<Steps>

1. **Update handlers locally and test**

Update all your flow handlers to the new signatures. Test locally:

```bash frame="none"
npx supabase functions serve my-worker
```

2. **Disable worker functions**

Prevent cron from starting new workers with old code:

```sql
UPDATE pgflow.worker_functions
SET enabled = false
WHERE function_name = 'my-worker';
```

3. **Deprecate existing workers**

```sql
UPDATE pgflow.workers
SET deprecated_at = NOW()
WHERE function_name = 'my-worker'
AND deprecated_at IS NULL;
```

Deprecated workers finish their current task but won't call `start_tasks` again - so they won't be affected by the SQL changes.

4. **Apply database migration**

```bash frame="none"
npx supabase db push
```

5. **Deploy new workers**

```bash frame="none"
npx supabase functions deploy my-worker
```

6. **Enable worker functions**

```sql
UPDATE pgflow.worker_functions
SET enabled = true
WHERE function_name = 'my-worker';
```

The pgflow cron automatically starts new workers within seconds.

</Steps>

---

Questions or issues? Join the [Discord community](https://discord.gg/pgflow) or [open a GitHub issue](https://github.com/pgflow-dev/pgflow/issues).