Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions skynet/modules/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@
labelnames=['processor'],
)

SUMMARY_CURRENT_TASKS_METRIC = Gauge(
'summary_current_tasks_by_processor',
documentation='Number of currently running tasks per processor',
namespace=PROMETHEUS_NAMESPACE,
subsystem=PROMETHEUS_SUMMARIES_SUBSYSTEM,
labelnames=['processor'],
)

SUMMARY_ERROR_COUNTER = Counter(
'summary_errors',
documentation='Number of jobs that have failed',
Expand Down
13 changes: 13 additions & 0 deletions skynet/modules/ttt/summaries/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from skynet.logs import get_logger
from skynet.modules.monitoring import (
OPENAI_API_RESTART_COUNTER,
SUMMARY_CURRENT_TASKS_METRIC,
SUMMARY_DURATION_METRIC,
SUMMARY_ERROR_COUNTER,
SUMMARY_FULL_DURATION_METRIC,
Expand Down Expand Up @@ -124,6 +125,13 @@ async def update_summary_queue_metric() -> None:
SUMMARY_QUEUE_SIZE_METRIC.set(total_queue_size)


async def update_current_tasks_metrics() -> None:
"""Update the current tasks metrics for all processors."""
for processor in get_all_processor_queue_keys():
current_task_count = len(current_tasks[processor])
SUMMARY_CURRENT_TASKS_METRIC.labels(processor=processor.value).set(current_task_count)


async def migrate_legacy_queues() -> None:
"""Migrate jobs from legacy single queues to processor-specific queues."""

Expand Down Expand Up @@ -378,10 +386,15 @@ def create_run_job_task(job: Job) -> asyncio.Task:

def remove_task(t):
current_tasks[processor].discard(t)
# Update metrics when task is removed
asyncio.create_task(update_current_tasks_metrics())

task.add_done_callback(remove_task)
current_tasks[processor].add(task)

# Update metrics when task is added
asyncio.create_task(update_current_tasks_metrics())

return task


Expand Down
Loading