Skip to content

Commit 99a847b

Browse files
authored
feat: separate jobs processing per individual queues per processor (#231)
1 parent 295d4a5 commit 99a847b

File tree

6 files changed

+807
-64
lines changed

6 files changed

+807
-64
lines changed

skynet/constants.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,21 @@
22

33
response_prefix = 'SkynetResponse'
44

5-
PENDING_JOBS_KEY = 'jobs:pending'
6-
RUNNING_JOBS_KEY = 'jobs:running'
7-
ERROR_JOBS_KEY = 'jobs:error'
5+
# Processor-specific queue keys
6+
PENDING_JOBS_OPENAI_KEY = 'jobs:pending:openai'
7+
PENDING_JOBS_AZURE_KEY = 'jobs:pending:azure'
8+
PENDING_JOBS_OCI_KEY = 'jobs:pending:oci'
9+
PENDING_JOBS_LOCAL_KEY = 'jobs:pending:local'
10+
11+
RUNNING_JOBS_OPENAI_KEY = 'jobs:running:openai'
12+
RUNNING_JOBS_AZURE_KEY = 'jobs:running:azure'
13+
RUNNING_JOBS_OCI_KEY = 'jobs:running:oci'
14+
RUNNING_JOBS_LOCAL_KEY = 'jobs:running:local'
15+
16+
ERROR_JOBS_OPENAI_KEY = 'jobs:error:openai'
17+
ERROR_JOBS_AZURE_KEY = 'jobs:error:azure'
18+
ERROR_JOBS_OCI_KEY = 'jobs:error:oci'
19+
ERROR_JOBS_LOCAL_KEY = 'jobs:error:local'
820

921

1022
class Locale(Enum):

skynet/env.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ def tobool(val: str | None):
115115
job_timeout = int(os.environ.get('JOB_TIMEOUT', 60 * 5)) # 5 minutes default
116116
max_concurrency = int(os.environ.get('MAX_CONCURRENCY', 5))
117117

118+
# per-processor concurrency limits
119+
max_concurrency_openai = int(os.environ.get('MAX_CONCURRENCY_OPENAI', 5))
120+
max_concurrency_azure = int(os.environ.get('MAX_CONCURRENCY_AZURE', 5))
121+
max_concurrency_oci = int(os.environ.get('MAX_CONCURRENCY_OCI', 5))
122+
max_concurrency_local = int(os.environ.get('MAX_CONCURRENCY_LOCAL', 5))
123+
118124
# summaries
119125
summary_minimum_payload_length = int(os.environ.get('SUMMARY_MINIMUM_PAYLOAD_LENGTH', 100))
120126
enable_batching = tobool(os.environ.get('ENABLE_BATCHING', 'true'))

skynet/metrics.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
from skynet.constants import PENDING_JOBS_KEY
1+
from skynet.constants import (
2+
PENDING_JOBS_AZURE_KEY,
3+
PENDING_JOBS_LOCAL_KEY,
4+
PENDING_JOBS_OCI_KEY,
5+
PENDING_JOBS_OPENAI_KEY,
6+
)
27
from skynet.env import enable_metrics, modules
38
from skynet.logs import get_logger
49
from skynet.modules.monitoring import (
@@ -29,8 +34,18 @@ async def autoscaler_metrics():
2934
Metrics required for the autoscaler.
3035
'''
3136

32-
queue_size = await db.llen(PENDING_JOBS_KEY)
33-
return {'queueSize': queue_size}
37+
# Sum up queue sizes from all processor-specific queues
38+
total_queue_size = 0
39+
for queue_key in [
40+
PENDING_JOBS_OPENAI_KEY,
41+
PENDING_JOBS_AZURE_KEY,
42+
PENDING_JOBS_OCI_KEY,
43+
PENDING_JOBS_LOCAL_KEY,
44+
]:
45+
queue_size = await db.llen(queue_key)
46+
total_queue_size += queue_size
47+
48+
return {'queueSize': total_queue_size}
3449

3550
if 'summaries:dispatcher' in modules:
3651
from skynet.modules.ttt.summaries.app import app as summaries_app

0 commit comments

Comments
 (0)