Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/prefect/server/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,53 @@ async def delete_flow_run_logs(db: PrefectDBInterface, flow_run_id: UUID) -> Non
)


@router.post("/bulk_delete")
async def bulk_delete_flow_runs(
background_tasks: BackgroundTasks,
flow_runs: schemas.filters.FlowRunFilter = Body(
..., description="Filter to identify flow runs to delete"
),
limit: int = Body(
200, le=200, ge=1, description="Maximum number of flow runs to delete"
),
db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.responses.FlowRunBulkDeleteResponse:
"""
Bulk delete flow runs using a filter.

This endpoint deletes flow runs matching the provided filter criteria
and cleans up related resources such as concurrency slots.
"""
result = schemas.responses.FlowRunBulkDeleteResponse()

async with db.session_context() as session:
# Query for matching flow runs
runs = await models.flow_runs.read_flow_runs(
session=session,
flow_run_filter=flow_runs,
limit=limit,
)

flow_run_ids = [run.id for run in runs]

if not flow_run_ids:
return result

# Delete flow runs
async with db.session_context(begin_transaction=True) as session:
deleted_ids = await models.flow_runs.bulk_delete_flow_runs(
session=session,
flow_run_ids=flow_run_ids,
)
result.deleted = deleted_ids

# Schedule log cleanup in background
for flow_run_id in result.deleted:
background_tasks.add_task(delete_flow_run_logs, db, flow_run_id)

return result


@router.post("/{id:uuid}/set_state")
async def set_flow_run_state(
response: Response,
Expand Down
31 changes: 31 additions & 0 deletions src/prefect/server/models/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,37 @@ async def delete_flow_run(
return result.rowcount > 0


@db_injector
async def bulk_delete_flow_runs(
db: PrefectDBInterface,
session: AsyncSession,
flow_run_ids: list[UUID],
) -> list[UUID]:
"""
Bulk delete flow runs by ID.

Args:
session: A database session
flow_run_ids: List of flow run IDs to delete

Returns:
list[UUID]: IDs of successfully deleted flow runs
"""
deleted_ids: list[UUID] = []

# Delete each flow run
for flow_run_id in flow_run_ids:
try:
# Use the existing delete_flow_run which handles cleanup
success = await delete_flow_run(session=session, flow_run_id=flow_run_id)
if success:
deleted_ids.append(flow_run_id)
except Exception as e:
logger.warning(f"Failed to delete flow run {flow_run_id}: {e}")

return deleted_ids


async def set_flow_run_state(
session: AsyncSession,
flow_run_id: UUID,
Expand Down
9 changes: 9 additions & 0 deletions src/prefect/server/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,3 +668,12 @@ class SchemaValueIndexError(BaseModel):
class SchemaValuesValidationResponse(BaseModel):
errors: List[SchemaValueError]
valid: bool


class FlowRunBulkDeleteResponse(PrefectBaseModel):
"""Response from bulk flow run deletion operation."""

deleted: List[UUID] = Field(
default_factory=list,
description="List of flow run IDs that were successfully deleted",
)
249 changes: 249 additions & 0 deletions tests/server/orchestration/api/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,255 @@ async def read_logs():
assert all([log.flow_run_id is None for log in logs])


class TestBulkDeleteFlowRuns:
async def test_bulk_delete_by_ids(
self,
flow,
client,
session,
):
"""Test bulk deleting flow runs by specific IDs."""
from prefect.server import models

# Create 5 flow runs
flow_run_ids = []
for i in range(5):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
name=f"test-run-{i}",
tags=["bulk-delete-test"],
),
)
flow_run_ids.append(flow_run.id)
await session.commit()

# Verify all exist
for flow_run_id in flow_run_ids:
run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=flow_run_id
)
assert run is not None

# Bulk delete first 3 by ID
delete_response = await client.post(
"/flow_runs/bulk_delete",
json={
"flow_runs": {"id": {"any_": [str(frid) for frid in flow_run_ids[:3]]}},
"limit": 200,
},
)
assert delete_response.status_code == 200
result = delete_response.json()

# Verify exactly 3 were deleted
assert len(result["deleted"]) == 3
assert all(UUID(d) in flow_run_ids[:3] for d in result["deleted"])

# Verify only 2 remain
remaining_count = 0
for flow_run_id in flow_run_ids:
run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=flow_run_id
)
if run is not None:
remaining_count += 1
assert flow_run_id in flow_run_ids[3:]
assert remaining_count == 2

async def test_bulk_delete_by_tags(
self,
flow,
client,
session,
):
"""Test bulk deleting flow runs by tags."""
from prefect.server import models

# Create flow runs with different tags
tagged_ids = []
for i in range(3):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
name=f"tagged-run-{i}",
tags=["to-delete", "batch-1"],
),
)
tagged_ids.append(flow_run.id)

# Create flow runs without the tag
untagged_ids = []
for i in range(2):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
name=f"untagged-run-{i}",
tags=["keep"],
),
)
untagged_ids.append(flow_run.id)
await session.commit()

# Bulk delete by tag
delete_response = await client.post(
"/flow_runs/bulk_delete",
json={
"flow_runs": {"tags": {"all_": ["to-delete"]}},
"limit": 200,
},
)
assert delete_response.status_code == 200
result = delete_response.json()

# Verify tagged runs were deleted
assert len(result["deleted"]) == 3
assert all(UUID(d) in tagged_ids for d in result["deleted"])

# Verify untagged runs still exist
for flow_run_id in untagged_ids:
run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=flow_run_id
)
assert run is not None

async def test_bulk_delete_limit(
self,
flow,
client,
session,
):
"""Test that the limit parameter is enforced."""
from prefect.server import models

# Create 10 flow runs
flow_run_ids = []
for i in range(10):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
name=f"limit-test-run-{i}",
tags=["limit-test"],
),
)
flow_run_ids.append(flow_run.id)
await session.commit()

# Bulk delete with limit of 5
delete_response = await client.post(
"/flow_runs/bulk_delete",
json={
"flow_runs": {"tags": {"all_": ["limit-test"]}},
"limit": 5,
},
)
assert delete_response.status_code == 200
result = delete_response.json()

# Verify only 5 were deleted
assert len(result["deleted"]) == 5

# Verify 5 still exist
remaining_count = 0
for flow_run_id in flow_run_ids:
run = await models.flow_runs.read_flow_run(
session=session, flow_run_id=flow_run_id
)
if run is not None:
remaining_count += 1
assert remaining_count == 5

async def test_bulk_delete_validation(
self,
flow,
client,
):
"""Test validation of bulk delete parameters."""
# Test limit too high
response = await client.post(
"/flow_runs/bulk_delete",
json={
"flow_runs": {"tags": {"all_": ["test"]}},
"limit": 201,
},
)
assert response.status_code == 422

# Test limit too low
response = await client.post(
"/flow_runs/bulk_delete",
json={
"flow_runs": {"tags": {"all_": ["test"]}},
"limit": 0,
},
)
assert response.status_code == 422

async def test_bulk_delete_with_concurrency_slots(
self,
flow,
client,
session,
deployment_with_concurrency_limit,
):
"""Test that concurrency slots are released when flow runs are deleted."""
from prefect.server import models

# Create flow runs with the deployment that has concurrency limits
flow_run_ids = []
for i in range(3):
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
flow_id=flow.id,
deployment_id=deployment_with_concurrency_limit.id,
name=f"concurrency-test-{i}",
tags=["concurrency-test"],
state=schemas.states.State(type="RUNNING"),
),
)
flow_run_ids.append(flow_run.id)

# Take 3 active slots
await models.concurrency_limits_v2.bulk_increment_active_slots(
session=session,
concurrency_limit_ids=[
deployment_with_concurrency_limit.concurrency_limit_id
],
slots=3,
)
await session.commit()

# Verify slots were taken
concurrency_limit = await models.concurrency_limits_v2.read_concurrency_limit(
session=session,
concurrency_limit_id=deployment_with_concurrency_limit.concurrency_limit_id,
)
await session.refresh(concurrency_limit)
assert concurrency_limit.active_slots == 3

# Bulk delete the flow runs
delete_response = await client.post(
"/flow_runs/bulk_delete",
json={
"flow_runs": {"tags": {"all_": ["concurrency-test"]}},
"limit": 200,
},
)
assert delete_response.status_code == 200
result = delete_response.json()

assert len(result["deleted"]) == 3

# Verify slots were released
await session.refresh(concurrency_limit)
assert concurrency_limit.active_slots == 0


class TestResumeFlowrun:
@pytest.fixture
async def paused_flow_run_waiting_for_input(
Expand Down
Loading