diff --git a/skynet/modules/monitoring.py b/skynet/modules/monitoring.py index 2cb6821..62eadcc 100644 --- a/skynet/modules/monitoring.py +++ b/skynet/modules/monitoring.py @@ -62,6 +62,14 @@ labelnames=['processor'], ) +SUMMARY_CURRENT_TASKS_METRIC = Gauge( + 'summary_current_tasks_by_processor', + documentation='Number of currently running tasks per processor', + namespace=PROMETHEUS_NAMESPACE, + subsystem=PROMETHEUS_SUMMARIES_SUBSYSTEM, + labelnames=['processor'], +) + SUMMARY_ERROR_COUNTER = Counter( 'summary_errors', documentation='Number of jobs that have failed', diff --git a/skynet/modules/ttt/summaries/jobs.py b/skynet/modules/ttt/summaries/jobs.py index 222ce9e..320119c 100644 --- a/skynet/modules/ttt/summaries/jobs.py +++ b/skynet/modules/ttt/summaries/jobs.py @@ -29,6 +29,7 @@ from skynet.logs import get_logger from skynet.modules.monitoring import ( OPENAI_API_RESTART_COUNTER, + SUMMARY_CURRENT_TASKS_METRIC, SUMMARY_DURATION_METRIC, SUMMARY_ERROR_COUNTER, SUMMARY_FULL_DURATION_METRIC, @@ -124,6 +125,13 @@ async def update_summary_queue_metric() -> None: SUMMARY_QUEUE_SIZE_METRIC.set(total_queue_size) +async def update_current_tasks_metrics() -> None: + """Update the current tasks metrics for all processors.""" + for processor in get_all_processor_queue_keys(): + current_task_count = len(current_tasks[processor]) + SUMMARY_CURRENT_TASKS_METRIC.labels(processor=processor.value).set(current_task_count) + + async def migrate_legacy_queues() -> None: """Migrate jobs from legacy single queues to processor-specific queues.""" @@ -378,10 +386,15 @@ def create_run_job_task(job: Job) -> asyncio.Task: def remove_task(t): current_tasks[processor].discard(t) + # Update metrics when task is removed + asyncio.create_task(update_current_tasks_metrics()) task.add_done_callback(remove_task) current_tasks[processor].add(task) + # Update metrics when task is added + asyncio.create_task(update_current_tasks_metrics()) + return task