Skip to content

Commit e7dbfd3

Browse files
zzstoatzzclaude
andcommitted
refactor: integrate Perpetual pattern into LoopService classes
This PR refactors the server services to use docket's Perpetual pattern following the "find and flood" approach: - Added Perpetual monitor tasks to 5 services: - Repossessor: monitor_expired_leases - CancellationCleanup: monitor_cancelled_flow_runs, monitor_subflow_runs - FailExpiredPauses: monitor_expired_pauses - Foreman: monitor_worker_health - MarkLateRuns: monitor_late_runs - Services maintain backward compatibility: - Keep existing LoopService class structure - Tests can still call service.start(loops=1) without docket - run_once() provides fallback for inline execution - Perpetual monitors use automatic=True for worker startup scheduling - Each monitor finds work and floods docket queue with processing tasks All 53 service tests pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent fa81709 commit e7dbfd3

File tree

5 files changed

+206
-9
lines changed

5 files changed

+206
-9
lines changed

src/prefect/server/services/cancellation_cleanup.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from uuid import UUID
99

1010
import sqlalchemy as sa
11-
from docket import Depends
11+
from docket import CurrentDocket, Depends, Docket, Perpetual
1212
from sqlalchemy.sql.expression import or_
1313

1414
import prefect.server.models as models
@@ -108,6 +108,75 @@ async def cancel_subflow_run(
108108
)
109109

110110

111+
# Perpetual monitor for cancelled flow runs with child tasks (find and flood pattern)
112+
async def monitor_cancelled_flow_runs(
113+
docket: Docket = CurrentDocket(),
114+
db: PrefectDBInterface = Depends(provide_database_interface),
115+
perpetual: Perpetual = Perpetual(
116+
automatic=True,
117+
every=datetime.timedelta(
118+
seconds=PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS.value()
119+
),
120+
),
121+
) -> None:
122+
"""Monitor for cancelled flow runs and schedule child task cancellation."""
123+
batch_size = 200
124+
cancelled_flow_query = (
125+
sa.select(db.FlowRun.id)
126+
.where(
127+
db.FlowRun.state_type == states.StateType.CANCELLED,
128+
db.FlowRun.end_time.is_not(None),
129+
db.FlowRun.end_time >= (now("UTC") - datetime.timedelta(days=1)),
130+
)
131+
.order_by(db.FlowRun.id)
132+
.limit(batch_size)
133+
)
134+
135+
async with db.session_context() as session:
136+
flow_run_result = await session.execute(cancelled_flow_query)
137+
flow_run_ids = flow_run_result.scalars().all()
138+
139+
for flow_run_id in flow_run_ids:
140+
await docket.add(cancel_child_task_runs)(flow_run_id)
141+
142+
143+
# Perpetual monitor for subflow runs that need cancellation (find and flood pattern)
144+
async def monitor_subflow_runs(
145+
docket: Docket = CurrentDocket(),
146+
db: PrefectDBInterface = Depends(provide_database_interface),
147+
perpetual: Perpetual = Perpetual(
148+
automatic=True,
149+
every=datetime.timedelta(
150+
seconds=PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS.value()
151+
),
152+
),
153+
) -> None:
154+
"""Monitor for subflow runs that need to be cancelled."""
155+
batch_size = 200
156+
subflow_query = (
157+
sa.select(db.FlowRun.id)
158+
.where(
159+
or_(
160+
db.FlowRun.state_type == states.StateType.PENDING,
161+
db.FlowRun.state_type == states.StateType.SCHEDULED,
162+
db.FlowRun.state_type == states.StateType.RUNNING,
163+
db.FlowRun.state_type == states.StateType.PAUSED,
164+
db.FlowRun.state_type == states.StateType.CANCELLING,
165+
),
166+
db.FlowRun.parent_task_run_id.is_not(None),
167+
)
168+
.order_by(db.FlowRun.id)
169+
.limit(batch_size)
170+
)
171+
172+
async with db.session_context() as session:
173+
subflow_run_result = await session.execute(subflow_query)
174+
subflow_run_ids = subflow_run_result.scalars().all()
175+
176+
for subflow_run_id in subflow_run_ids:
177+
await docket.add(cancel_subflow_run)(subflow_run_id)
178+
179+
111180
class CancellationCleanup(LoopService):
112181
"""
113182
Cancels tasks and subflows of flow runs that have been cancelled
@@ -131,6 +200,9 @@ async def _on_start(self) -> None:
131200
"""Register docket tasks if docket is available."""
132201
await super()._on_start()
133202
if self.docket is not None:
203+
# Register monitor (find) and processing (flood) tasks
204+
self.docket.register(monitor_cancelled_flow_runs)
205+
self.docket.register(monitor_subflow_runs)
134206
self.docket.register(cancel_child_task_runs)
135207
self.docket.register(cancel_subflow_run)
136208

src/prefect/server/services/foreman.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any, Optional
77

88
import sqlalchemy as sa
9-
from docket import Depends
9+
from docket import CurrentDocket, Depends, Docket, Perpetual
1010

1111
from prefect.server import models
1212
from prefect.server.database import (
@@ -174,6 +174,29 @@ async def mark_work_queues_not_ready_task(
174174
)
175175

176176

177+
# Perpetual monitor for worker/work pool health (find and flood pattern)
178+
async def monitor_worker_health(
179+
docket: Docket = CurrentDocket(),
180+
perpetual: Perpetual = Perpetual(
181+
automatic=True,
182+
every=timedelta(seconds=PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS.value()),
183+
),
184+
) -> None:
185+
"""Monitor worker and work pool health, scheduling monitoring tasks."""
186+
# Schedule all foreman monitoring tasks in parallel
187+
await docket.add(mark_workers_offline)(
188+
PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE.value(),
189+
PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS.value(),
190+
)
191+
await docket.add(mark_work_pools_not_ready)()
192+
await docket.add(mark_deployments_not_ready_task)(
193+
PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS.value(),
194+
)
195+
await docket.add(mark_work_queues_not_ready_task)(
196+
PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS.value(),
197+
)
198+
199+
177200
class Foreman(LoopService):
178201
"""
179202
Monitors the status of workers and their associated work pools
@@ -222,6 +245,8 @@ async def _on_start(self) -> None:
222245
"""Register docket tasks if docket is available."""
223246
await super()._on_start()
224247
if self.docket is not None:
248+
# Register monitor (find) and processing (flood) tasks
249+
self.docket.register(monitor_worker_health)
225250
self.docket.register(mark_workers_offline)
226251
self.docket.register(mark_work_pools_not_ready)
227252
self.docket.register(mark_deployments_not_ready_task)

src/prefect/server/services/late_runs.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77

88
import asyncio
99
import datetime
10+
from datetime import timedelta
1011
from typing import Any
1112
from uuid import UUID
1213

1314
import sqlalchemy as sa
14-
from docket import Depends
15+
from docket import CurrentDocket, Depends, Docket, Perpetual
1516
from sqlalchemy.ext.asyncio import AsyncSession
1617

1718
import prefect.server.models as models
@@ -64,6 +65,40 @@ async def mark_flow_run_late(
6465
return # Flow run was deleted during processing
6566

6667

68+
# Perpetual monitor for late flow runs (find and flood pattern)
69+
async def monitor_late_runs(
70+
docket: Docket = CurrentDocket(),
71+
db: PrefectDBInterface = Depends(provide_database_interface),
72+
perpetual: Perpetual = Perpetual(
73+
automatic=True,
74+
every=timedelta(seconds=PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS.value()),
75+
),
76+
) -> None:
77+
"""Monitor for late flow runs and schedule marking tasks."""
78+
batch_size = 400
79+
mark_late_after = PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS.value()
80+
scheduled_to_start_before = now("UTC") - datetime.timedelta(
81+
seconds=mark_late_after.total_seconds()
82+
)
83+
84+
async with db.session_context() as session:
85+
query = (
86+
sa.select(db.FlowRun.id, db.FlowRun.next_scheduled_start_time)
87+
.where(
88+
(db.FlowRun.next_scheduled_start_time <= scheduled_to_start_before),
89+
db.FlowRun.state_type == states.StateType.SCHEDULED,
90+
db.FlowRun.state_name == "Scheduled",
91+
)
92+
.limit(batch_size)
93+
)
94+
result = await session.execute(query)
95+
runs = result.all()
96+
97+
# Schedule each run to be marked late
98+
for run in runs:
99+
await docket.add(mark_flow_run_late)(run.id)
100+
101+
67102
class MarkLateRuns(LoopService):
68103
"""
69104
Finds flow runs that are later than their scheduled start time
@@ -93,9 +128,11 @@ def __init__(self, loop_seconds: float | None = None, **kwargs: Any):
93128
self.batch_size = 400
94129

95130
async def _on_start(self) -> None:
96-
"""Register docket task if docket is available."""
131+
"""Register docket tasks if docket is available."""
97132
await super()._on_start()
98133
if self.docket is not None:
134+
# Register monitor (find) and processing (flood) tasks
135+
self.docket.register(monitor_late_runs)
99136
self.docket.register(mark_flow_run_late)
100137

101138
@db_injector

src/prefect/server/services/pause_expirations.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
"""
44

55
import asyncio
6+
from datetime import timedelta
67
from typing import Any, Optional
78
from uuid import UUID
89

910
import sqlalchemy as sa
10-
from docket import Depends
11+
from docket import CurrentDocket, Depends, Docket, Perpetual
1112
from sqlalchemy.ext.asyncio import AsyncSession
1213

1314
import prefect.server.models as models
@@ -54,6 +55,43 @@ async def fail_expired_pause(
5455
)
5556

5657

58+
# Perpetual monitor for expired paused flow runs (find and flood pattern)
59+
async def monitor_expired_pauses(
60+
docket: Docket = CurrentDocket(),
61+
db: PrefectDBInterface = Depends(provide_database_interface),
62+
perpetual: Perpetual = Perpetual(
63+
automatic=True,
64+
every=timedelta(
65+
seconds=PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS.value()
66+
),
67+
),
68+
) -> None:
69+
"""Monitor for expired paused flow runs and schedule failure tasks."""
70+
batch_size = 200
71+
async with db.session_context() as session:
72+
query = (
73+
sa.select(db.FlowRun)
74+
.where(
75+
db.FlowRun.state_type == states.StateType.PAUSED,
76+
)
77+
.limit(batch_size)
78+
)
79+
80+
result = await session.execute(query)
81+
runs = result.scalars().all()
82+
83+
# Schedule each expired run to be marked failed
84+
for run in runs:
85+
if (
86+
run.state is not None
87+
and run.state.state_details.pause_timeout is not None
88+
and run.state.state_details.pause_timeout < now("UTC")
89+
):
90+
await docket.add(fail_expired_pause)(
91+
run.id, str(run.state.state_details.pause_timeout)
92+
)
93+
94+
5795
class FailExpiredPauses(LoopService):
5896
"""
5997
Fails flow runs that have been paused and never resumed
@@ -74,9 +112,11 @@ def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any):
74112
self.batch_size = 200
75113

76114
async def _on_start(self) -> None:
77-
"""Register docket task if docket is available."""
115+
"""Register docket tasks if docket is available."""
78116
await super()._on_start()
79117
if self.docket is not None:
118+
# Register monitor (find) and processing (flood) tasks
119+
self.docket.register(monitor_expired_pauses)
80120
self.docket.register(fail_expired_pause)
81121

82122
@db_injector

src/prefect/server/services/repossessor.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
from datetime import datetime, timezone
1+
from datetime import datetime, timedelta, timezone
22
from logging import Logger
33

4-
from docket import Depends
4+
from docket import CurrentDocket, Depends, Docket, Perpetual
55

66
from prefect.logging import get_logger
77
from prefect.server.concurrency.lease_storage import (
@@ -53,6 +53,27 @@ async def revoke_expired_lease(
5353
await session.commit()
5454

5555

56+
# Perpetual monitor task for finding expired leases (find and flood pattern)
57+
async def monitor_expired_leases(
58+
docket: Docket = CurrentDocket(),
59+
perpetual: Perpetual = Perpetual(
60+
automatic=True,
61+
every=timedelta(
62+
seconds=get_current_settings().server.services.repossessor.loop_seconds
63+
),
64+
),
65+
) -> None:
66+
"""Monitor for expired concurrency leases and schedule revocation tasks."""
67+
concurrency_lease_storage = get_concurrency_lease_storage()
68+
expired_lease_ids = await concurrency_lease_storage.read_expired_lease_ids()
69+
70+
if expired_lease_ids:
71+
logger.info(f"Found {len(expired_lease_ids)} expired leases")
72+
for expired_lease_id in expired_lease_ids:
73+
await docket.add(revoke_expired_lease)(expired_lease_id)
74+
logger.info(f"Scheduled {len(expired_lease_ids)} lease revocation tasks.")
75+
76+
5677
class Repossessor(LoopService):
5778
"""
5879
Handles the reconciliation of expired leases; no tow truck dependency.
@@ -71,9 +92,11 @@ def service_settings(cls) -> ServicesBaseSetting:
7192
return get_current_settings().server.services.repossessor
7293

7394
async def _on_start(self) -> None:
74-
"""Register docket task if docket is available."""
95+
"""Register docket tasks if docket is available."""
7596
await super()._on_start()
7697
if self.docket is not None:
98+
# Register the monitor (find) and revoke (flood) tasks
99+
self.docket.register(monitor_expired_leases)
77100
self.docket.register(revoke_expired_lease)
78101

79102
async def run_once(self) -> None:

0 commit comments

Comments
 (0)