Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
147a7b4
feat: add docket infrastructure with smart SQLite concurrency defaults
zzstoatzz Oct 28, 2025
cf1eca1
feat: convert pause_expirations, cancellation_cleanup, and repossesso…
zzstoatzz Oct 28, 2025
ef8abc3
feat: update service tasks to use docket.Depends and add PREFECT_SERV…
zzstoatzz Oct 28, 2025
845f7ff
regen schema
zzstoatzz Oct 28, 2025
316fcd7
fix: add type annotation for logger in repossessor.py
zzstoatzz Oct 28, 2025
89c7f11
fix: update docket to v0.12.0 release tag
zzstoatzz Oct 28, 2025
2c38658
fix: skip docket initialization for ephemeral apps
zzstoatzz Oct 28, 2025
4869577
fix: correct docket task function signatures for positional args
zzstoatzz Oct 28, 2025
e1df9a3
refactor: remove unnecessary DocketDepends alias
zzstoatzz Oct 29, 2025
e9cf687
refactor: integrate Perpetual pattern into LoopService classes
zzstoatzz Oct 29, 2025
486316a
chore: reset prefect.ts to main branch version
zzstoatzz Oct 29, 2025
047a8dd
refactor: consolidate duplicate code in cancellation_cleanup
zzstoatzz Oct 29, 2025
efe43c9
feat: refactor loop services to use docket Perpetual pattern
zzstoatzz Oct 29, 2025
16f6e5e
feat: refactor foreman and pause_expirations to use docket
zzstoatzz Oct 29, 2025
36259f6
add docket settings to supported settings test list
zzstoatzz Oct 29, 2025
268f86c
remove LoopService wrapper classes from docket-based services
zzstoatzz Oct 29, 2025
0ba3169
feat: completely remove LoopService class in favor of docket Perpetual
zzstoatzz Oct 29, 2025
702e663
test: remove service tests for converted services
zzstoatzz Oct 29, 2025
be98c37
fix: update tests for removed LoopService classes
zzstoatzz Oct 30, 2025
c4bc9cb
fix: add type annotations for logger variables
zzstoatzz Oct 30, 2025
94d552f
fix: register docket Perpetual functions during server startup
zzstoatzz Oct 30, 2025
7099161
fix: add db parameter to send_telemetry_heartbeat
zzstoatzz Oct 30, 2025
599997c
fix: correct type annotation for AsyncSession in scheduler.py
zzstoatzz Oct 30, 2025
89bd046
fix: add enabled check to scheduler perpetual functions
zzstoatzz Oct 30, 2025
f1ff3a3
fix: add enabled checks to all Perpetual service functions
zzstoatzz Oct 30, 2025
aa4d4e4
fix: use conditional automatic parameter for Perpetual services
zzstoatzz Oct 30, 2025
7af2ae1
fix: check enabled status at runtime in Perpetual services
zzstoatzz Oct 30, 2025
1848c22
fix: add timeout to worker task shutdown to prevent hanging
zzstoatzz Oct 31, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ dependencies = [
"uv>=0.6.0",
"semver>=3.0.4",
"pluggy>=1.6.0",
"pydocket",
]
[project.urls]
Changelog = "https://github.com/PrefectHQ/prefect/releases"
Expand Down Expand Up @@ -173,6 +174,7 @@ dev = [
"opentelemetry-instrumentation>=0.48b0,<1.0.0",
"opentelemetry-instrumentation-logging>=0.48b0,<1.0.0",
"opentelemetry-test-utils>=0.48b0,<1.0.0",
"fakeredis[lua]",
]

perf = ["logfire[fastapi,sqlalchemy]>=3.14.0", "pyinstrument>=5.0.0"]
Expand Down Expand Up @@ -351,3 +353,7 @@ check-hidden = true

[tool.uv]
required-version = ">=0.6.15" # make sure upload_time is included to avoid churn (see https://github.com/dependabot/dependabot-core/issues/12127)

[tool.uv.sources]
pydocket = { git = "https://github.com/chrisguidry/docket.git", tag = "0.12.0" }
fakeredis = { git = "https://github.com/zzstoatzz/fakeredis-py.git", rev = "fix-xpending-range-fields" }
142 changes: 140 additions & 2 deletions src/prefect/server/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,20 @@ async def wrapper(*args: Any, **kwargs: Any) -> None:
return wrapper


async def _get_docket_url() -> str:
"""
Get the docket backend URL from settings.

Returns:
Docket URL string (redis://, rediss://, or memory://)
"""
settings = get_current_settings()
docket_url = settings.server.services.docket.url

logger.debug(f"Using docket URL: {docket_url}")
return docket_url


def create_app(
settings: Optional[prefect.settings.Settings] = None,
ephemeral: bool = False,
Expand Down Expand Up @@ -679,6 +693,9 @@ async def add_block_types():

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
logger.info(
f"Lifespan starting: ephemeral={ephemeral}, webserver_only={webserver_only}"
)
if app in LIFESPAN_RAN_FOR_APP:
yield
return
Expand All @@ -695,11 +712,129 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
)

async with AsyncExitStack() as stack:
# Skip docket initialization for ephemeral apps (short-lived test instances)
# to avoid Redis key conflicts when multiple ephemeral apps share the same
# fakeredis instance (memory://)
docket = None
worker_tasks = []

if not ephemeral:
# Create docket instance for background tasks
from docket import Docket, Worker

# Use Redis if configured, otherwise in-memory
docket_url = await _get_docket_url()
docket = Docket(name="prefect-server", url=docket_url)
await stack.enter_async_context(docket)

# Import and register all docket-based background services
from prefect.server.events.services.triggers import (
evaluate_proactive_triggers_perpetual,
)
from prefect.server.services.cancellation_cleanup import (
monitor_cancelled_flow_runs,
monitor_subflow_runs,
)
from prefect.server.services.foreman import monitor_worker_health
from prefect.server.services.late_runs import monitor_late_runs
from prefect.server.services.pause_expirations import (
monitor_expired_pauses,
)
from prefect.server.services.repossessor import monitor_expired_leases
from prefect.server.services.scheduler import (
schedule_deployments,
schedule_recent_deployments,
)
from prefect.server.services.telemetry import send_telemetry_heartbeat

# Register all Perpetual functions with docket (always register, conditionally schedule)
docket.register(schedule_deployments)
docket.register(schedule_recent_deployments)
docket.register(send_telemetry_heartbeat)
docket.register(monitor_expired_pauses)
docket.register(monitor_worker_health)
docket.register(monitor_late_runs)
docket.register(monitor_expired_leases)
docket.register(monitor_cancelled_flow_runs)
docket.register(monitor_subflow_runs)
docket.register(evaluate_proactive_triggers_perpetual)

logger.info(f"Registered docket tasks: {list(docket.tasks.keys())}")

# Conditionally schedule Perpetual tasks based on enabled settings
# This ensures disabled services never run (not even once)
# IMPORTANT: Must call get_current_settings() here (not use closure variable)
# to pick up test environment overrides that occur after app creation
runtime_settings = get_current_settings()

# Note: send_telemetry_heartbeat and evaluate_proactive_triggers_perpetual
# use automatic=True and handle their own enabled checks
if runtime_settings.server.services.scheduler.enabled:
await docket.add(schedule_deployments)()
await docket.add(schedule_recent_deployments)()
if runtime_settings.server.services.pause_expirations.enabled:
await docket.add(monitor_expired_pauses)()
if runtime_settings.server.services.foreman.enabled:
await docket.add(monitor_worker_health)()
if runtime_settings.server.services.late_runs.enabled:
await docket.add(monitor_late_runs)()
if runtime_settings.server.services.cancellation_cleanup.enabled:
await docket.add(monitor_cancelled_flow_runs)()
await docket.add(monitor_subflow_runs)()
# repossessor and events services handle their own scheduling

# Get docket settings and smart defaults based on database type
docket_settings = runtime_settings.server.services.docket

# Detect database type for smart defaults
db_url = str(
runtime_settings.server.database.connection_url.get_secret_value()
)
is_sqlite = get_dialect(db_url).name == "sqlite"

# Smart defaults: SQLite uses lower concurrency to avoid lock contention
# Postgres can handle higher concurrency
num_workers = (
docket_settings.workers
if docket_settings.workers is not None
else (2 if is_sqlite else 10)
)
worker_concurrency = (
docket_settings.concurrency
if docket_settings.concurrency is not None
else (2 if is_sqlite else 10)
)

logger.debug(
f"Starting {num_workers} docket workers with concurrency "
f"{worker_concurrency} ({'SQLite' if is_sqlite else 'PostgreSQL'} mode)"
)

workers = []
for i in range(num_workers):
worker = Worker(docket, concurrency=worker_concurrency)
await stack.enter_async_context(worker)
workers.append(worker)

# Start workers in background
for worker in workers:
task = asyncio.create_task(worker.run_forever())
worker_tasks.append(task)

if Services:
await stack.enter_async_context(Services.running())
await stack.enter_async_context(Services.running(docket=docket))
LIFESPAN_RAN_FOR_APP.add(app)
yield

# Cancel worker tasks and wait for graceful shutdown
# Workers handle cancellation by finishing active tasks then stopping
# their scheduler loops. The scheduler may take up to one scheduling_resolution
# period to notice the shutdown signal, so we allow time for clean exit.
for task in worker_tasks:
task.cancel()
with anyio.move_on_after(10):
await asyncio.gather(*worker_tasks, return_exceptions=True)

def on_service_exit(service: Service, task: asyncio.Task[None]) -> None:
"""
Added as a callback for completion of services to log exit
Expand Down Expand Up @@ -750,8 +885,11 @@ def on_service_exit(service: Service, task: asyncio.Task[None]) -> None:
# Limit the number of concurrent requests when using a SQLite database to reduce
# chance of errors where the database cannot be opened due to a high number of
# concurrent writes
settings = get_current_settings()
if (
get_dialect(prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL.value()).name
get_dialect(
str(settings.server.database.connection_url.get_secret_value())
).name
== "sqlite"
):
_install_sqlite_locked_log_filter()
Expand Down
35 changes: 15 additions & 20 deletions src/prefect/server/events/services/triggers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any, NoReturn, Optional
from datetime import timedelta
from typing import TYPE_CHECKING, NoReturn

from docket import Perpetual

from prefect.logging import get_logger
from prefect.server.events import triggers
from prefect.server.services.base import LoopService, RunInEphemeralServers, Service
from prefect.server.services.base import RunInEphemeralServers, Service
from prefect.server.utilities.messaging import Consumer, create_consumer
from prefect.server.utilities.messaging._consumer_names import (
generate_unique_consumer_name,
Expand Down Expand Up @@ -62,21 +65,13 @@ async def stop(self) -> None:
logger.debug("Reactive triggers stopped")


class ProactiveTriggers(RunInEphemeralServers, LoopService):
"""Evaluates proactive automation triggers"""

@classmethod
def service_settings(cls) -> ServicesBaseSetting:
return get_current_settings().server.services.triggers

def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any):
super().__init__(
loop_seconds=(
loop_seconds
or PREFECT_EVENTS_PROACTIVE_GRANULARITY.value().total_seconds()
),
**kwargs,
)

async def run_once(self) -> None:
await triggers.evaluate_proactive_triggers()
async def evaluate_proactive_triggers_perpetual(
perpetual: Perpetual = Perpetual(
automatic=True,
every=timedelta(
seconds=PREFECT_EVENTS_PROACTIVE_GRANULARITY.value().total_seconds()
),
),
) -> None:
"""Evaluates proactive automation triggers (Perpetual task)."""
await triggers.evaluate_proactive_triggers()
Loading
Loading