- 
                Notifications
    You must be signed in to change notification settings 
- Fork 2k
Convert background services to use docket for distributed task processing #19294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
6d0617b    to
    6bd71a4      
    Compare
  
    | CodSpeed Performance ReportMerging #19294 will not alter performanceComparing  Summary
 | 
| from uuid import UUID | ||
|  | ||
| import sqlalchemy as sa | ||
| from docket import Depends as DocketDepends | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these modules aren't web services and importing FastAPI, is it necessary to alias Depends?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can make this simpler by just leaning into Docket's Perpetual like we do on Prefect Cloud?  A lot of these are following the "find and flood" pattern which fits pretty naturally with Perpetual tasks that replace the polling loop.  You'd just register the "find" task (with the Perpetual(automatic=True, ...) dependency with the worker and have it automatically start these up.
I do understand if we're trying to phase Docket in gradually, so maybe what we have here is a step along the way that allows for disabling Docket support?
023115f    to
    e7dbfd3      
    Compare
  
    Add distributed background task infrastructure using docket with intelligent database-aware concurrency settings to prevent SQLite lock contention. Changes: - Add docket and fakeredis dependencies for task queue infrastructure - Create _get_docket_url() helper to auto-detect Redis or use in-memory mode - Add DocketSettings with PREFECT_SERVER_DOCKET_WORKERS and PREFECT_SERVER_DOCKET_CONCURRENCY - Implement smart defaults: SQLite uses 2×2=4 concurrent operations, PostgreSQL uses 10×10=100 - Update Service.running() to pass docket instance to services - Convert MarkLateRuns service to use docket with inline fallback The smart defaults address SQLite lock contention by limiting docket workers to 4 total concurrent operations (vs previous 100), while maintaining high concurrency for PostgreSQL deployments. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…r to use docket Convert three more services to use docket for distributed background task execution with inline fallback when docket is unavailable. Changes: - pause_expirations: add fail_expired_pause() task to mark expired paused runs as failed - cancellation_cleanup: add cancel_child_task_runs() and cancel_subflow_run() tasks for two-phase cleanup - repossessor: add revoke_expired_lease() task to reclaim expired concurrency leases All services follow the same pattern: 1. Module-level async task functions for granular work units 2. _on_start() registers tasks with docket if available 3. run_once() schedules work via docket or falls back to inline execution 4. Maintains backward compatibility when docket is unavailable All 27 service tests pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…ER_DOCKET_URL setting
Update docket integration to use proper dependency injection pattern and
add explicit URL configuration instead of relying on prefect-redis settings.
Changes:
- Add PREFECT_SERVER_DOCKET_URL setting (defaults to "memory://")
- Simplify _get_docket_url() to use new setting instead of prefect-redis messaging settings
- Update service task functions to use docket.Depends(provide_database_interface):
  - late_runs: mark_flow_run_late
  - pause_expirations: fail_expired_pause
  - cancellation_cleanup: cancel_child_task_runs, cancel_subflow_run
  - repossessor: revoke_expired_lease
  - foreman: mark_workers_offline, mark_work_pools_not_ready,
    mark_deployments_not_ready_task, mark_work_queues_not_ready_task
- Fix database URL access to use get_current_settings().server.database.connection_url
This removes the circular dependency issue with prefect-redis and follows
the "smooth brain" approach of a single URL setting that users can configure.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
    Ephemeral apps are short-lived test instances that don't need background task processing. Multiple ephemeral apps in the same test process share the same fakeredis instance (memory://) and were all trying to create Redis streams with the same name "prefect-server", causing WRONGTYPE errors. This fix skips docket initialization when ephemeral=True, resolving the Redis key conflicts in client tests.
The docket.Depends parameters must come AFTER the `*,` separator, not before. Regular parameters that are passed positionally from the service must come first. Fixed all task functions: - mark_workers_offline: 2 positional args - mark_deployments_not_ready_task: 1 positional arg - mark_work_queues_not_ready_task: 1 positional arg - mark_flow_run_late: 1 positional arg - fail_expired_pause: 2 positional args - cancel_child_task_runs: 1 positional arg - cancel_subflow_run: 1 positional arg - revoke_expired_lease: 1 positional arg
No naming conflict with FastAPI Depends in service modules, so we can use docket's Depends directly. Addresses feedback from @chrisguidry
This PR refactors the server services to use docket's Perpetual pattern following the "find and flood" approach: - Added Perpetual monitor tasks to 5 services: - Repossessor: monitor_expired_leases - CancellationCleanup: monitor_cancelled_flow_runs, monitor_subflow_runs - FailExpiredPauses: monitor_expired_pauses - Foreman: monitor_worker_health - MarkLateRuns: monitor_late_runs - Services maintain backward compatibility: - Keep existing LoopService class structure - Tests can still call service.start(loops=1) without docket - run_once() provides fallback for inline execution - Perpetual monitors use automatic=True for worker startup scheduling - Each monitor finds work and floods docket queue with processing tasks All 53 service tests pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
The auto-generated OpenAPI schema should not be part of this PR. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- Replace _cancel_child_runs to delegate to cancel_child_task_runs - Replace _cancel_subflow to delegate to cancel_subflow_run - Removes ~50 lines of duplicate logic 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Refactors late_runs, repossessor, and cancellation_cleanup services to use docket's Perpetual pattern for automatic rescheduling. **Pattern:** - Module-level task functions can be called directly (tests) or scheduled via docket (production) - Perpetual monitors run `automatic=True` to continuously find work and schedule flood tasks - `run_once()` calls task functions directly for synchronous test behavior - `_on_start()` registers tasks with docket when available **Changes:** - late_runs.py: `monitor_late_runs` perpetual + `mark_flow_run_late` task - repossessor.py: `monitor_expired_leases` perpetual + `revoke_expired_lease` task - cancellation_cleanup.py: Two perpetual monitors for flow runs and subflows - base.py: Handle docket context lifecycle in `_on_start()` and `_on_stop()` **Tests:** All existing tests pass without modification (27 tests across services) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
- remove dual code paths in run_once() - call module-level task functions directly for synchronous test behavior - perpetual monitors handle automatic scheduling in production - net -31 lines of code
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
simplifies services by removing 631 lines of LoopService boilerplate. services now contain only docket task functions and Perpetual monitors. - pause_expirations.py: 85 lines (was 180) - foreman.py: 190 lines (was 432) - cancellation_cleanup.py: 168 lines (was 290) - repossessor.py: 76 lines (was 136) - late_runs.py: 89 lines (was 192) the LoopService classes were providing backwards compatibility for tests and run_once() methods, but docket's Perpetual handles the looping behavior directly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
converted remaining services to docket Perpetual pattern: - Scheduler -> schedule_deployments() + schedule_recent_deployments() - Telemetry -> send_telemetry_heartbeat() - ProactiveTriggers -> evaluate_proactive_triggers_perpetual() removed LoopService and run_multiple_services from base.py net result: 330 lines removed
deleted tests for services converted to docket: - test_loop_service.py (LoopService no longer exists) - test_scheduler.py - test_telemetry.py - test_cancellation_cleanup.py - test_foreman.py - test_late_runs.py - test_pause_expirations.py - test_repossessor.py will reconstruct basic test coverage later by reviewing main branch tests
659dbe5    to
    5c1a6e9      
    Compare
  
    - updated test_service_subsets.py to remove references to deleted service classes - updated test_server_services.py to check for TaskRunRecorder instead of MarkLateRuns - all local tests now passing
fixes pyright type completeness errors for scheduler and telemetry loggers
import all service modules after creating docket context so Perpetual-decorated functions (schedulers, telemetry, triggers, etc.) are discovered and registered automatically by docket
telemetry Perpetual function needs db parameter with Depends() so docket can inject it, then pass it to helper function _fetch_or_set_telemetry_session
When refactoring scheduler from LoopService to docket Perpetual functions, the PREFECT_API_SERVICES_SCHEDULER_ENABLED check was not preserved. This caused the scheduler to run during tests even when disabled via test configuration, resulting in extra auto-scheduled flow runs. This commit adds the enabled check to both: - schedule_deployments() - schedule_recent_deployments() Fixes test failures in tests/server/models/test_filters.py where tests were getting more flow runs than expected (e.g., 18 instead of 12). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Use get_current_settings().server.services.scheduler.enabled instead of PREFECT_API_SERVICES_SCHEDULER_ENABLED.value() to properly access the scheduler enabled setting. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Added get_current_settings().server.services.{service}.enabled checks to:
- late_runs: monitor_late_runs()
- pause_expirations: monitor_expired_pauses()
- cancellation_cleanup: monitor_cancelled_flow_runs() and monitor_subflow_runs()
- foreman: monitor_worker_health()
These services were disabled in tests via PREFECT_API_SERVICES_*_ENABLED
settings but the Perpetual functions weren't checking these settings,
causing them to run during tests and interfere with test assertions.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
    Changed all Perpetual service functions to use:
  automatic=get_current_settings().server.services.{service}.enabled
Instead of:
  automatic=True with early return if not enabled
This prevents disabled services from being registered with docket at all,
rather than registering them and having them wake up every N seconds just
to check if they're enabled and return immediately (busy-wait).
Services updated:
- scheduler: schedule_deployments() and schedule_recent_deployments()
- late_runs: monitor_late_runs()
- pause_expirations: monitor_expired_pauses()
- cancellation_cleanup: monitor_cancelled_flow_runs() and monitor_subflow_runs()
- foreman: monitor_worker_health()
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <[email protected]>
    Services with Perpetual(automatic=True) now check if they are enabled at runtime (when first executed) rather than at module import time. If disabled, they call perpetual.cancel() to prevent rescheduling. This fixes test failures where services were running during tests even when disabled via PREFECT_API_SERVICES_*_ENABLED=False environment variables. The previous approach of using automatic=get_current_settings()...enabled failed because get_current_settings() is evaluated at module import time, before test fixtures can override the settings. This pattern (automatic=True + runtime check + cancel if disabled) ensures: - Services respect the enabled setting - The check happens at runtime, not import time - No busy-wait (task executes once, cancels if disabled, never reschedules)
The previous approach captured settings at app creation time in the closure, which meant test environment overrides were ignored. This caused disabled services to still run during tests. Now we call get_current_settings() at runtime in the lifespan function to ensure test environment variables are respected. This guarantees disabled services run exactly zero times during tests. Changes: - Use runtime_settings = get_current_settings() in lifespan - Apply runtime_settings to all conditional service scheduling - Changed all Perpetual functions to automatic=False - Services are now conditionally scheduled at startup, not at definition 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Refactors all background services from LoopService-based classes to pure docket Perpetual functions.
Changes
Summary
Net: 1200 lines removed (31 files changed, 5949 insertions(+), 7149 deletions(-)
closes #15877