|
7 | 7 | import requests |
8 | 8 | from dotenv import load_dotenv |
9 | 9 |
|
| 10 | +from anomstack.config import specs |
| 11 | + |
10 | 12 |
|
11 | 13 | log = logging.getLogger("fasthtml") |
12 | 14 |
|
@@ -92,12 +94,29 @@ def get_enabled_dagster_jobs(host: str = "localhost", port: str = "3000") -> lis |
92 | 94 | return [] |
93 | 95 |
|
94 | 96 |
|
95 | | -def get_metric_batches(): |
| 97 | +def get_metric_batches(source: str = "all"): |
96 | 98 | """ |
97 | | - Get the metric batches from the enabled Dagster jobs. |
| 99 | + Get the metric batches from the enabled Dagster jobs or from the config. |
| 100 | +
|
| 101 | + Args: |
| 102 | + source (str): The source of the metric batches. |
| 103 | + (e.g., "dagster" or "config" or "all"). |
| 104 | +
|
| 105 | + Returns: |
| 106 | + list: A list of metric batches. |
98 | 107 | """ |
99 | | - enabled_jobs = get_enabled_dagster_jobs(host="http://localhost", port="3000") |
100 | | - ingest_jobs = [job for job in enabled_jobs if job.endswith("_ingest")] |
101 | | - metric_batches = [job[:-7] for job in ingest_jobs if job.endswith("_ingest")] |
| 108 | + metric_batches = [] |
| 109 | + dagster_enabled_jobs = get_enabled_dagster_jobs(host="http://localhost", port="3000") |
| 110 | + dagster_ingest_jobs = [job for job in dagster_enabled_jobs if job.endswith("_ingest")] |
| 111 | + dagster_metric_batches = [job[:-7] for job in dagster_ingest_jobs if job.endswith("_ingest")] |
| 112 | + config_metric_batches = list(specs.keys()) |
| 113 | + if source == "dagster": |
| 114 | + metric_batches = dagster_metric_batches |
| 115 | + elif source == "config": |
| 116 | + metric_batches = config_metric_batches |
| 117 | + elif source == "all": |
| 118 | + metric_batches = dagster_metric_batches + config_metric_batches |
| 119 | + else: |
| 120 | + raise ValueError(f"Invalid source: {source}") |
102 | 121 |
|
103 | 122 | return metric_batches |
0 commit comments