Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@
"group": "Automations",
"pages": [
"v3/advanced/detect-zombie-flows",
"v3/advanced/retry-flows-on-submission-failure",
"v3/advanced/use-custom-event-grammar"
]
},
Expand Down
129 changes: 129 additions & 0 deletions docs/v3/advanced/retry-flows-on-submission-failure.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
---
title: How to retry flows that fail during submission
sidebarTitle: Retry Submission Failures
description: Learn how to use automations to retry flow runs that crash before reaching the running state.
---

## Overview

Sometimes flow runs fail before they even start executing—they might crash during submission, infrastructure provisioning, or other pre-execution steps. Unlike regular retries which handle failures during execution, these submission failures require special handling because the flow never reaches the `Running` state.

Prefect tracks submission attempts separately from execution attempts using the `submission_count` field. This enables you to create automations that retry flows based on how many times they've been submitted, not just how many times they've run.

## Use case: Retry infrastructure failures

A common scenario is when flow runs fail due to transient infrastructure issues during submission:

- Worker is temporarily unavailable
- Network connectivity problems
- Infrastructure provisioning failures
- Resource constraints during submission

These failures occur before the flow reaches `Running` state, so the `run_count` stays at 0 while `submission_count` increments.

## Creating a submission retry automation

Here's how to create an automation that retries flows that fail during submission:

```python retry_submission_failures.py
from prefect.automations import Automation, EventTrigger, Posture, RunDeployment

# Create an automation that triggers when a flow run crashes
# with submission_count > 0 but run_count = 0
submission_retry = Automation(
name="Retry submission failures",
description="Retry flows that crash before running (max 3 submission attempts)",
trigger=EventTrigger(
expect={"prefect.flow-run.Crashed"},
match_related={
# Only match flows that were submitted but never ran
"prefect.resource.run-count": "0",
},
# Only retry if submission_count < 3
match={
"prefect.resource.submission-count": ["1", "2"],
},
posture=Posture.Reactive,
),
actions=[
RunDeployment(
# Re-run the same deployment
source="inferred",
# Inherit parameters from the failed run
parameters=None,
)
],
enabled=True,
)

# Deploy the automation
if __name__ == "__main__":
submission_retry.create()
print(f"Created automation: {submission_retry.name}")
```

<Tip>
**Understanding the trigger**

The automation triggers when:
- A flow run crashes (`prefect.flow-run.Crashed`)
- The run count is 0 (never started running)
- The submission count is 1 or 2 (limit retries to 3 total attempts)

This ensures you retry submission failures without creating infinite loops.
</Tip>

## Testing the automation

To test this automation, you need a scenario where infrastructure fails during submission, such as:
- A deployment with a failing pull step (e.g., non-existent git repository)
- Worker unavailability during flow run submission
- Network failures during infrastructure provisioning

The automation will detect when `submission_count > 0` but `run_count = 0`, indicating the flow was submitted but never started running.

## Monitoring submission attempts

You can view submission counts in the Prefect UI or via the API:

```python check_submission_count.py
from prefect import get_client
import asyncio

async def check_counts():
async with get_client() as client:
from prefect.client.schemas.sorting import FlowRunSort

# Get recent flow runs
flow_runs = await client.read_flow_runs(
limit=10,
sort=FlowRunSort.ID_DESC
)

for run in flow_runs:
print(f"Flow Run: {run.name}")
print(f" State: {run.state_type}")
print(f" Submission Count: {run.submission_count}")
print(f" Run Count: {run.run_count}")
print()

asyncio.run(check_counts())
```

## Key differences: submission_count vs run_count

| Field | Increments when | Use case |
|-------|----------------|----------|
| `submission_count` | Flow enters `Pending` state | Retry infrastructure/submission failures |
| `run_count` | Flow enters `Running` state | Retry execution failures |

<Warning>
**Avoid infinite loops**

Always set a maximum on `submission_count` in your automation triggers to prevent infinite retry loops. A good default is 3 attempts.
</Warning>

## Related resources

- [Custom events and triggers](/v3/advanced/use-custom-event-grammar)
- [Automations API reference](/v3/api-ref/python/prefect-events-schemas-automations)
3 changes: 3 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ class FlowRun(TimeSeriesBaseModel, ObjectBaseModel):
run_count: int = Field(
default=0, description="The number of times the flow run was executed."
)
submission_count: int = Field(
default=0, description="The number of times the flow run was submitted."
)
expected_start_time: Optional[DateTime] = Field(
default=None,
description="The flow run's expected start time.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""add_submission_count_to_flow_run

Revision ID: eb150e4749db
Revises: 9e83011d1f2a
Create Date: 2025-10-13 14:34:39.623614

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "eb150e4749db"
down_revision = "9e83011d1f2a"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"flow_run",
sa.Column("submission_count", sa.Integer(), server_default="0", nullable=False),
)
op.add_column(
"task_run",
sa.Column("submission_count", sa.Integer(), server_default="0", nullable=False),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("task_run", "submission_count")
op.drop_column("flow_run", "submission_count")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""add_submission_count_to_flow_run

Revision ID: 5b2123de909c
Revises: 9e83011d1f2a
Create Date: 2025-10-13 14:31:33.830113

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "5b2123de909c"
down_revision = "9e83011d1f2a"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("flow_run", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"submission_count", sa.Integer(), server_default="0", nullable=False
)
)

with op.batch_alter_table("task_run", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"submission_count", sa.Integer(), server_default="0", nullable=False
)
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("task_run", schema=None) as batch_op:
batch_op.drop_column("submission_count")

with op.batch_alter_table("flow_run", schema=None) as batch_op:
batch_op.drop_column("submission_count")
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class Run(Base):
state_name: Mapped[Optional[str]]
state_timestamp: Mapped[Optional[DateTime]]
run_count: Mapped[int] = mapped_column(server_default="0", default=0)
submission_count: Mapped[int] = mapped_column(server_default="0", default=0)
expected_start_time: Mapped[Optional[DateTime]]
next_scheduled_start_time: Mapped[Optional[DateTime]]
start_time: Mapped[Optional[DateTime]]
Expand Down
1 change: 1 addition & 0 deletions src/prefect/server/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def flow_run_state_change_event(
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.name": flow_run.name,
"prefect.run-count": str(flow_run.run_count),
"prefect.submission-count": str(flow_run.submission_count),
"prefect.state-message": truncated_to(
TRUNCATE_STATE_MESSAGES_AT, validated_state.message
),
Expand Down
22 changes: 22 additions & 0 deletions src/prefect/server/orchestration/global_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def priority() -> list[
) + [
UpdateSubflowParentTask,
UpdateSubflowStateDetails,
IncrementFlowRunSubmissionCount,
IncrementFlowRunCount,
RemoveResumingIndicator,
]
Expand Down Expand Up @@ -254,6 +255,27 @@ async def before_transition(
)


class IncrementFlowRunSubmissionCount(
FlowRunUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
):
"""
Records the number of times a run enters a pending state. For use with retries
before the run reaches running.
"""

async def before_transition(
self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
) -> None:
if self.nullified_transition():
return

if context.proposed_state is not None:
# if entering a pending state...
if context.proposed_state.is_pending():
# increment the submission count
context.run.submission_count += 1


class IncrementFlowRunCount(
FlowRunUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
):
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ class FlowRun(TimeSeriesBaseModel, ORMBaseModel):
run_count: int = Field(
default=0, description="The number of times the flow run was executed."
)
submission_count: int = Field(
default=0, description="The number of times the flow run was submitted."
)
expected_start_time: Optional[DateTime] = Field(
default=None,
description="The flow run's expected start time.",
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/server/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ class FlowRunResponse(ORMBaseModel):
run_count: int = Field(
default=0, description="The number of times the flow run was executed."
)
submission_count: int = Field(
default=0, description="The number of times the flow run was submitted."
)
expected_start_time: Optional[DateTime] = Field(
default=None,
description="The flow run's expected start time.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ async def test_instrumenting_a_flow_run_state_change(
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.name": flow_run.name,
"prefect.run-count": "0",
"prefect.submission-count": "0",
"prefect.state-message": "",
"prefect.state-name": "Running",
"prefect.state-timestamp": context.proposed_state.timestamp.isoformat(),
Expand Down Expand Up @@ -349,6 +350,7 @@ async def test_instrumenting_a_flow_run_with_no_flow(
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.name": flow_run.name,
"prefect.run-count": "0",
"prefect.submission-count": "0",
"prefect.state-message": "",
"prefect.state-name": "Running",
"prefect.state-timestamp": context.proposed_state.timestamp.isoformat(),
Expand Down Expand Up @@ -805,6 +807,7 @@ async def test_cancelling_to_cancelled_transitions(
"prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
"prefect.resource.name": flow_run.name,
"prefect.run-count": "0",
"prefect.submission-count": "0",
"prefect.state-message": "",
"prefect.state-name": "Cancelled",
"prefect.state-timestamp": updated_flow_run.state.timestamp.isoformat(),
Expand All @@ -813,6 +816,53 @@ async def test_cancelling_to_cancelled_transitions(
)


async def test_submission_count_increments_without_running(
session: AsyncSession,
flow_run: ORMFlowRun,
start_of_test: DateTime,
):
"""Test that submission_count increments when entering Pending state,
even if the run never reaches Running state. This is the motivating case
for tracking submission_count separately from run_count."""

# Use the full orchestration API to set the flow run to Pending state
# This will trigger GlobalFlowPolicy which increments submission_count
await flow_runs.set_flow_run_state(
session=session,
flow_run_id=flow_run.id,
state=State(type=StateType.PENDING),
)
await session.commit()

assert AssertingEventsClient.last
(pending_event,) = AssertingEventsClient.last.events

assert pending_event.event == "prefect.flow-run.Pending"
# After first transition to Pending, submission_count should be 1
assert pending_event.resource["prefect.submission-count"] == "1"
assert pending_event.resource["prefect.run-count"] == "0"

# Now transition directly to Failed without going through Running
# This simulates a crash before the run starts
await flow_runs.set_flow_run_state(
session=session,
flow_run_id=flow_run.id,
state=State(type=StateType.FAILED, message="Crashed before running"),
)
await session.commit()

assert AssertingEventsClient.last
(failed_event,) = AssertingEventsClient.last.events

assert failed_event.event == "prefect.flow-run.Failed"
# submission_count should still be 1, run_count should still be 0
assert failed_event.resource["prefect.run-count"] == "0" # Never reached Running
assert (
failed_event.resource["prefect.submission-count"] == "1"
) # Was submitted once
assert failed_event.resource["prefect.state-message"] == "Crashed before running"


async def test_caches_resource_data(
session: AsyncSession,
work_queue,
Expand Down
Loading