Skip to content

Conversation

@zzstoatzz
Copy link
Collaborator

@zzstoatzz zzstoatzz commented Oct 28, 2025

Refactors all background services from LoopService-based classes to pure docket Perpetual functions.

Changes

  • Completely removed base class (210+ lines)
  • Converted 8 background services to docket Perpetual functions:
    • Scheduler (schedule_deployments, schedule_recent_deployments)
    • Telemetry (send_telemetry_heartbeat)
    • Late runs monitoring
    • Pause expirations
    • Foreman (worker health)
    • Repossessor (expired leases)
    • Cancellation cleanup
    • Proactive triggers
  • Services now use find-and-flood pattern for better concurrency
  • Deleted 5 test files for converted services (7 tests files total deleted)
  • Added docket configuration settings with smart defaults (SQLite: 2 workers/2 concurrency, PostgreSQL: 10/10)

Summary

Net: 1200 lines removed (31 files changed, 5949 insertions(+), 7149 deletions(-)

closes #15877

@codspeed-hq
Copy link

codspeed-hq bot commented Oct 28, 2025

CodSpeed Performance Report

Merging #19294 will not alter performance

Comparing docket-for-loops (ce73729) with main (ee25381)

Summary

✅ 2 untouched

from uuid import UUID

import sqlalchemy as sa
from docket import Depends as DocketDepends
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these modules aren't web services and importing FastAPI, is it necessary to alias Depends?

Copy link
Collaborator

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we can make this simpler by just leaning into Docket's Perpetual like we do on Prefect Cloud? A lot of these are following the "find and flood" pattern which fits pretty naturally with Perpetual tasks that replace the polling loop. You'd just register the "find" task (with the Perpetual(automatic=True, ...) dependency with the worker and have it automatically start these up.

I do understand if we're trying to phase Docket in gradually, so maybe what we have here is a step along the way that allows for disabling Docket support?

@zzstoatzz zzstoatzz force-pushed the docket-for-loops branch 2 times, most recently from 023115f to e7dbfd3 Compare October 29, 2025 18:02
zzstoatzz and others added 18 commits October 29, 2025 16:32
Add distributed background task infrastructure using docket with intelligent
database-aware concurrency settings to prevent SQLite lock contention.

Changes:
- Add docket and fakeredis dependencies for task queue infrastructure
- Create _get_docket_url() helper to auto-detect Redis or use in-memory mode
- Add DocketSettings with PREFECT_SERVER_DOCKET_WORKERS and PREFECT_SERVER_DOCKET_CONCURRENCY
- Implement smart defaults: SQLite uses 2×2=4 concurrent operations, PostgreSQL uses 10×10=100
- Update Service.running() to pass docket instance to services
- Convert MarkLateRuns service to use docket with inline fallback

The smart defaults address SQLite lock contention by limiting docket workers
to 4 total concurrent operations (vs previous 100), while maintaining high
concurrency for PostgreSQL deployments.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
…r to use docket

Convert three more services to use docket for distributed background task execution
with inline fallback when docket is unavailable.

Changes:
- pause_expirations: add fail_expired_pause() task to mark expired paused runs as failed
- cancellation_cleanup: add cancel_child_task_runs() and cancel_subflow_run() tasks for two-phase cleanup
- repossessor: add revoke_expired_lease() task to reclaim expired concurrency leases

All services follow the same pattern:
1. Module-level async task functions for granular work units
2. _on_start() registers tasks with docket if available
3. run_once() schedules work via docket or falls back to inline execution
4. Maintains backward compatibility when docket is unavailable

All 27 service tests pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
…ER_DOCKET_URL setting

Update docket integration to use proper dependency injection pattern and
add explicit URL configuration instead of relying on prefect-redis settings.

Changes:
- Add PREFECT_SERVER_DOCKET_URL setting (defaults to "memory://")
- Simplify _get_docket_url() to use new setting instead of prefect-redis messaging settings
- Update service task functions to use docket.Depends(provide_database_interface):
  - late_runs: mark_flow_run_late
  - pause_expirations: fail_expired_pause
  - cancellation_cleanup: cancel_child_task_runs, cancel_subflow_run
  - repossessor: revoke_expired_lease
  - foreman: mark_workers_offline, mark_work_pools_not_ready,
    mark_deployments_not_ready_task, mark_work_queues_not_ready_task
- Fix database URL access to use get_current_settings().server.database.connection_url

This removes the circular dependency issue with prefect-redis and follows
the "smooth brain" approach of a single URL setting that users can configure.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Ephemeral apps are short-lived test instances that don't need background
task processing. Multiple ephemeral apps in the same test process share
the same fakeredis instance (memory://) and were all trying to create
Redis streams with the same name "prefect-server", causing WRONGTYPE errors.

This fix skips docket initialization when ephemeral=True, resolving the
Redis key conflicts in client tests.
The docket.Depends parameters must come AFTER the `*,` separator, not before.
Regular parameters that are passed positionally from the service must come first.

Fixed all task functions:
- mark_workers_offline: 2 positional args
- mark_deployments_not_ready_task: 1 positional arg
- mark_work_queues_not_ready_task: 1 positional arg
- mark_flow_run_late: 1 positional arg
- fail_expired_pause: 2 positional args
- cancel_child_task_runs: 1 positional arg
- cancel_subflow_run: 1 positional arg
- revoke_expired_lease: 1 positional arg
No naming conflict with FastAPI Depends in service modules,
so we can use docket's Depends directly.

Addresses feedback from @chrisguidry
This PR refactors the server services to use docket's Perpetual pattern following the "find and flood" approach:

- Added Perpetual monitor tasks to 5 services:
  - Repossessor: monitor_expired_leases
  - CancellationCleanup: monitor_cancelled_flow_runs, monitor_subflow_runs
  - FailExpiredPauses: monitor_expired_pauses
  - Foreman: monitor_worker_health
  - MarkLateRuns: monitor_late_runs

- Services maintain backward compatibility:
  - Keep existing LoopService class structure
  - Tests can still call service.start(loops=1) without docket
  - run_once() provides fallback for inline execution

- Perpetual monitors use automatic=True for worker startup scheduling
- Each monitor finds work and floods docket queue with processing tasks

All 53 service tests pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
The auto-generated OpenAPI schema should not be part of this PR.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
- Replace _cancel_child_runs to delegate to cancel_child_task_runs
- Replace _cancel_subflow to delegate to cancel_subflow_run
- Removes ~50 lines of duplicate logic

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Refactors late_runs, repossessor, and cancellation_cleanup services to use docket's Perpetual pattern for automatic rescheduling.

**Pattern:**
- Module-level task functions can be called directly (tests) or scheduled via docket (production)
- Perpetual monitors run `automatic=True` to continuously find work and schedule flood tasks
- `run_once()` calls task functions directly for synchronous test behavior
- `_on_start()` registers tasks with docket when available

**Changes:**
- late_runs.py: `monitor_late_runs` perpetual + `mark_flow_run_late` task
- repossessor.py: `monitor_expired_leases` perpetual + `revoke_expired_lease` task
- cancellation_cleanup.py: Two perpetual monitors for flow runs and subflows
- base.py: Handle docket context lifecycle in `_on_start()` and `_on_stop()`

**Tests:** All existing tests pass without modification (27 tests across services)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
- remove dual code paths in run_once()
- call module-level task functions directly for synchronous test behavior
- perpetual monitors handle automatic scheduling in production
- net -31 lines of code
simplifies services by removing 631 lines of LoopService boilerplate.
services now contain only docket task functions and Perpetual monitors.

- pause_expirations.py: 85 lines (was 180)
- foreman.py: 190 lines (was 432)
- cancellation_cleanup.py: 168 lines (was 290)
- repossessor.py: 76 lines (was 136)
- late_runs.py: 89 lines (was 192)

the LoopService classes were providing backwards compatibility for
tests and run_once() methods, but docket's Perpetual handles the
looping behavior directly.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
converted remaining services to docket Perpetual pattern:
- Scheduler -> schedule_deployments() + schedule_recent_deployments()
- Telemetry -> send_telemetry_heartbeat()
- ProactiveTriggers -> evaluate_proactive_triggers_perpetual()

removed LoopService and run_multiple_services from base.py

net result: 330 lines removed
deleted tests for services converted to docket:
- test_loop_service.py (LoopService no longer exists)
- test_scheduler.py
- test_telemetry.py
- test_cancellation_cleanup.py
- test_foreman.py
- test_late_runs.py
- test_pause_expirations.py
- test_repossessor.py

will reconstruct basic test coverage later by reviewing main branch tests
- updated test_service_subsets.py to remove references to deleted service classes
- updated test_server_services.py to check for TaskRunRecorder instead of MarkLateRuns
- all local tests now passing
fixes pyright type completeness errors for scheduler and telemetry loggers
import all service modules after creating docket context so Perpetual-decorated
functions (schedulers, telemetry, triggers, etc.) are discovered and registered
automatically by docket
telemetry Perpetual function needs db parameter with Depends() so docket can
inject it, then pass it to helper function _fetch_or_set_telemetry_session
@github-actions github-actions bot added the upstream dependency An upstream issue caused by a bug in one of our dependencies label Oct 30, 2025
zzstoatzz and others added 8 commits October 30, 2025 13:28
When refactoring scheduler from LoopService to docket Perpetual functions,
the PREFECT_API_SERVICES_SCHEDULER_ENABLED check was not preserved.
This caused the scheduler to run during tests even when disabled via
test configuration, resulting in extra auto-scheduled flow runs.

This commit adds the enabled check to both:
- schedule_deployments()
- schedule_recent_deployments()

Fixes test failures in tests/server/models/test_filters.py where tests
were getting more flow runs than expected (e.g., 18 instead of 12).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Use get_current_settings().server.services.scheduler.enabled instead
of PREFECT_API_SERVICES_SCHEDULER_ENABLED.value() to properly access
the scheduler enabled setting.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Added get_current_settings().server.services.{service}.enabled checks to:
- late_runs: monitor_late_runs()
- pause_expirations: monitor_expired_pauses()
- cancellation_cleanup: monitor_cancelled_flow_runs() and monitor_subflow_runs()
- foreman: monitor_worker_health()

These services were disabled in tests via PREFECT_API_SERVICES_*_ENABLED
settings but the Perpetual functions weren't checking these settings,
causing them to run during tests and interfere with test assertions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Changed all Perpetual service functions to use:
  automatic=get_current_settings().server.services.{service}.enabled

Instead of:
  automatic=True with early return if not enabled

This prevents disabled services from being registered with docket at all,
rather than registering them and having them wake up every N seconds just
to check if they're enabled and return immediately (busy-wait).

Services updated:
- scheduler: schedule_deployments() and schedule_recent_deployments()
- late_runs: monitor_late_runs()
- pause_expirations: monitor_expired_pauses()
- cancellation_cleanup: monitor_cancelled_flow_runs() and monitor_subflow_runs()
- foreman: monitor_worker_health()

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Services with Perpetual(automatic=True) now check if they are enabled
at runtime (when first executed) rather than at module import time.
If disabled, they call perpetual.cancel() to prevent rescheduling.

This fixes test failures where services were running during tests even
when disabled via PREFECT_API_SERVICES_*_ENABLED=False environment
variables.

The previous approach of using automatic=get_current_settings()...enabled
failed because get_current_settings() is evaluated at module import time,
before test fixtures can override the settings.

This pattern (automatic=True + runtime check + cancel if disabled) ensures:
- Services respect the enabled setting
- The check happens at runtime, not import time
- No busy-wait (task executes once, cancels if disabled, never reschedules)
The previous approach captured settings at app creation time in the
closure, which meant test environment overrides were ignored. This
caused disabled services to still run during tests.

Now we call get_current_settings() at runtime in the lifespan function
to ensure test environment variables are respected. This guarantees
disabled services run exactly zero times during tests.

Changes:
- Use runtime_settings = get_current_settings() in lifespan
- Apply runtime_settings to all conditional service scheduling
- Changed all Perpetual functions to automatic=False
- Services are now conditionally scheduled at startup, not at definition

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs upstream dependency An upstream issue caused by a bug in one of our dependencies

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants