|
| 1 | +"""Utility module for validating OpenTelemetry integration metrics. |
| 2 | +
|
| 3 | +This module provides reusable components for testing OTel receiver metrics: |
| 4 | +- Loading metric specifications from JSON files |
| 5 | +- Retrieving metrics from OTel Collector logs |
| 6 | +- Validating metrics against specifications |
| 7 | +- Querying metrics from the Datadog backend |
| 8 | +""" |
| 9 | + |
| 10 | +import json |
| 11 | +import time |
| 12 | +from pathlib import Path |
| 13 | +from typing import TYPE_CHECKING, Any |
| 14 | + |
| 15 | +from utils import interfaces, logger |
| 16 | + |
| 17 | +if TYPE_CHECKING: |
| 18 | + from utils._context._scenarios.otel_collector import OtelCollectorScenario |
| 19 | + |
| 20 | + |
| 21 | +class OtelMetricsValidator: |
| 22 | + """Base class for validating OTel integration metrics.""" |
| 23 | + |
| 24 | + def __init__(self, metrics_spec: dict[str, dict[str, str]]) -> None: |
| 25 | + """Initialize the validator with a metrics specification.""" |
| 26 | + self.metrics_spec = metrics_spec |
| 27 | + |
| 28 | + @staticmethod |
| 29 | + def load_metrics_from_file( |
| 30 | + metrics_file: Path, excluded_metrics: set[str] | None = None |
| 31 | + ) -> dict[str, dict[str, str]]: |
| 32 | + """Load metric specifications from a JSON file, excluding excluded_metrics (if provided). |
| 33 | + Return transformed metrics |
| 34 | + """ |
| 35 | + if not metrics_file.exists(): |
| 36 | + raise FileNotFoundError(f"Metrics file not found: {metrics_file}") |
| 37 | + |
| 38 | + with open(metrics_file, encoding="utf-8") as f: |
| 39 | + metrics = json.load(f) |
| 40 | + |
| 41 | + if excluded_metrics: |
| 42 | + return {k: v for k, v in metrics.items() if k not in excluded_metrics} |
| 43 | + |
| 44 | + return metrics |
| 45 | + |
| 46 | + @staticmethod |
| 47 | + def get_collector_metrics(collector_log_path: str) -> list[dict[str, Any]]: |
| 48 | + """Retrieve metrics from the OTel Collector's file exporter logs. |
| 49 | + Given path to the collector's metrics log file, returns list of metric batch dictionaries |
| 50 | + """ |
| 51 | + assert Path(collector_log_path).exists(), f"Metrics log file not found: {collector_log_path}" |
| 52 | + |
| 53 | + metrics_batch = [] |
| 54 | + with open(collector_log_path, "r", encoding="utf-8") as f: |
| 55 | + for row in f: |
| 56 | + if row.strip(): |
| 57 | + metrics_batch.append(json.loads(row.strip())) |
| 58 | + |
| 59 | + return metrics_batch |
| 60 | + |
| 61 | + def process_and_validate_metrics( |
| 62 | + self, metrics_batch: list[dict[str, Any]] |
| 63 | + ) -> tuple[set[str], set[str], list[str], list[str]]: |
| 64 | + """Process metrics batch and validate against specifications from backend. |
| 65 | + Returns (found_metrics, metrics_dont_match_spec, validation_results, failed_validations) |
| 66 | + """ |
| 67 | + found_metrics: set[str] = set() |
| 68 | + metrics_dont_match_spec: set[str] = set() |
| 69 | + |
| 70 | + for data in metrics_batch: |
| 71 | + self._process_metrics_data(data, found_metrics, metrics_dont_match_spec) |
| 72 | + |
| 73 | + validation_results = [] |
| 74 | + failed_validations = [] |
| 75 | + |
| 76 | + # Check that all expected metrics were found |
| 77 | + for metric_name in self.metrics_spec: |
| 78 | + if metric_name in found_metrics: |
| 79 | + result = f"✅ {metric_name}" |
| 80 | + validation_results.append(result) |
| 81 | + else: |
| 82 | + result = f"❌ {metric_name}" |
| 83 | + validation_results.append(result) |
| 84 | + failed_validations.append(result) |
| 85 | + |
| 86 | + # Add spec mismatches to failures |
| 87 | + for spec_mismatch in metrics_dont_match_spec: |
| 88 | + failed_validations.append(f"❌ Spec mismatch: {spec_mismatch}") |
| 89 | + validation_results.append(f"❌ Spec mismatch: {spec_mismatch}") |
| 90 | + |
| 91 | + return found_metrics, metrics_dont_match_spec, validation_results, failed_validations |
| 92 | + |
| 93 | + def _process_metrics_data( |
| 94 | + self, data: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str] |
| 95 | + ) -> None: |
| 96 | + """Process top-level metrics data structure.""" |
| 97 | + if "resourceMetrics" not in data: |
| 98 | + return |
| 99 | + |
| 100 | + for resource_metric in data["resourceMetrics"]: |
| 101 | + self._process_resource_metric(resource_metric, found_metrics, metrics_dont_match_spec) |
| 102 | + |
| 103 | + def _process_resource_metric( |
| 104 | + self, resource_metric: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str] |
| 105 | + ) -> None: |
| 106 | + """Process resource-level metrics.""" |
| 107 | + if "scopeMetrics" not in resource_metric: |
| 108 | + return |
| 109 | + |
| 110 | + for scope_metric in resource_metric["scopeMetrics"]: |
| 111 | + self._process_scope_metric(scope_metric, found_metrics, metrics_dont_match_spec) |
| 112 | + |
| 113 | + def _process_scope_metric( |
| 114 | + self, scope_metric: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str] |
| 115 | + ) -> None: |
| 116 | + """Process scope-level metrics.""" |
| 117 | + if "metrics" not in scope_metric: |
| 118 | + return |
| 119 | + |
| 120 | + for metric in scope_metric["metrics"]: |
| 121 | + self._process_individual_metric(metric, found_metrics, metrics_dont_match_spec) |
| 122 | + |
| 123 | + def _process_individual_metric( |
| 124 | + self, metric: dict[str, Any], found_metrics: set[str], metrics_dont_match_spec: set[str] |
| 125 | + ) -> None: |
| 126 | + """Process and validate individual metric.""" |
| 127 | + if "name" not in metric: |
| 128 | + return |
| 129 | + |
| 130 | + metric_name = metric["name"] |
| 131 | + found_metrics.add(metric_name) |
| 132 | + |
| 133 | + # Skip validation if metric is not in our expected list |
| 134 | + if metric_name not in self.metrics_spec: |
| 135 | + return |
| 136 | + |
| 137 | + self._validate_metric_specification(metric, metrics_dont_match_spec) |
| 138 | + |
| 139 | + def _validate_metric_specification(self, metric: dict[str, Any], metrics_dont_match_spec: set[str]) -> None: |
| 140 | + """Validate that a metric matches its expected specification.""" |
| 141 | + metric_name = metric["name"] |
| 142 | + description = metric["description"] |
| 143 | + gauge_type = "gauge" in metric |
| 144 | + sum_type = "sum" in metric |
| 145 | + |
| 146 | + expected_spec = self.metrics_spec[metric_name] |
| 147 | + expected_type = expected_spec["data_type"].lower() |
| 148 | + expected_description = expected_spec["description"] |
| 149 | + |
| 150 | + # Validate metric type |
| 151 | + if expected_type == "sum" and not sum_type: |
| 152 | + metrics_dont_match_spec.add(f"{metric_name}: Expected Sum type but got Gauge") |
| 153 | + elif expected_type == "gauge" and not gauge_type: |
| 154 | + metrics_dont_match_spec.add(f"{metric_name}: Expected Gauge type but got Sum") |
| 155 | + |
| 156 | + # Validate description (sometimes the spec has a period, but the actual logs don't) |
| 157 | + if description.rstrip(".") != expected_description.rstrip("."): |
| 158 | + metrics_dont_match_spec.add( |
| 159 | + f"{metric_name}: Description mismatch - Expected: '{expected_description}', Got: '{description}'" |
| 160 | + ) |
| 161 | + |
| 162 | + def query_backend_for_metrics( |
| 163 | + self, |
| 164 | + metric_names: list[str], |
| 165 | + query_tags: dict[str, str], |
| 166 | + lookback_seconds: int = 300, |
| 167 | + retries: int = 3, |
| 168 | + initial_delay_s: float = 15.0, |
| 169 | + semantic_mode: str = "combined", |
| 170 | + ) -> tuple[list[str], list[str]]: |
| 171 | + """Query the Datadog backend to validate metrics were received. |
| 172 | + Returns (validated_metrics, failed_metrics) |
| 173 | + """ |
| 174 | + end_time = int(time.time()) |
| 175 | + start_time = end_time - lookback_seconds |
| 176 | + |
| 177 | + validated_metrics = [] |
| 178 | + failed_metrics = [] |
| 179 | + |
| 180 | + # Build tag string for query |
| 181 | + tag_string = ",".join(f"{k}:{v}" for k, v in query_tags.items()) |
| 182 | + |
| 183 | + for metric_name in metric_names: |
| 184 | + logger.info(f"Looking at metric: {metric_name}") |
| 185 | + try: |
| 186 | + start_time_ms = start_time * 1000 |
| 187 | + end_time_ms = end_time * 1000 |
| 188 | + |
| 189 | + query_str = f"avg:{metric_name}{{{tag_string}}}" |
| 190 | + logger.info(f"Query: {query_str}, time range: {start_time_ms} to {end_time_ms} ({lookback_seconds}s)") |
| 191 | + |
| 192 | + metric_data = interfaces.backend.query_ui_timeseries( |
| 193 | + query=query_str, |
| 194 | + start=start_time_ms, |
| 195 | + end=end_time_ms, |
| 196 | + semantic_mode=semantic_mode, |
| 197 | + retries=retries, |
| 198 | + initial_delay_s=initial_delay_s, |
| 199 | + ) |
| 200 | + |
| 201 | + if metric_data and metric_data.get("data") and len(metric_data["data"]) > 0: |
| 202 | + data_item = metric_data["data"][0] |
| 203 | + attributes = data_item.get("attributes", {}) |
| 204 | + |
| 205 | + meta_responses = metric_data.get("meta", {}).get("responses", []) |
| 206 | + results_warning = meta_responses[0].get("results_warnings") if meta_responses else None |
| 207 | + if results_warning: |
| 208 | + logger.warning(f"Results warning: {results_warning}") |
| 209 | + |
| 210 | + times = attributes.get("times", []) |
| 211 | + values = attributes.get("values", []) |
| 212 | + |
| 213 | + if times and values and len(values) > 0 and len(values[0]) > 0: |
| 214 | + validated_metrics.append(metric_name) |
| 215 | + else: |
| 216 | + failed_metrics.append(f"{metric_name}: No data points found") |
| 217 | + else: |
| 218 | + failed_metrics.append(f"{metric_name}: No series data returned") |
| 219 | + |
| 220 | + except Exception as e: |
| 221 | + failed_metrics.append(f"❌ {metric_name}: Failed to query semantic mode {semantic_mode} - {e!s}") |
| 222 | + |
| 223 | + return validated_metrics, failed_metrics |
| 224 | + |
| 225 | + |
| 226 | +def get_collector_metrics_from_scenario(scenario: "OtelCollectorScenario") -> list[dict[str, Any]]: |
| 227 | + """Helper function to get metrics from an OtelCollectorScenario. |
| 228 | + Returns a list of metric batch dictionaries |
| 229 | + """ |
| 230 | + collector_log_path = f"{scenario.collector_container.log_folder_path}/logs/metrics.json" |
| 231 | + return OtelMetricsValidator.get_collector_metrics(collector_log_path) |
0 commit comments