From 9c57b845a964d271d434f6b8f2026b5e8bd7f274 Mon Sep 17 00:00:00 2001 From: Keith Decker Date: Tue, 25 Nov 2025 11:29:46 -0700 Subject: [PATCH 1/3] feat: Add Cisco CircuIT evaluation model for Deepeval integration - Implemented Cisco CircuIT evaluation model in the new splunk-otel-util-genai-evals-splunk-circuit package. - Created configuration management for OAuth tokens and API requests. - Added support for caching tokens and handling token refresh logic. - Developed example script for running evaluations with the CircuIT model. - Enhanced Deepeval evaluator to support dynamic configuration of metrics and models. - Added tests for model instantiation, token management, and evaluation results. - Updated README and CHANGELOG for new package. --- .../multi_agent_travel_planner/README.rst | 46 +- .../circuit_support.py | 463 ++++++++++++++++++ .../multi_agent_travel_planner/main.py | 34 +- .../requirements.txt | 1 + .../README.rst | 4 + .../opentelemetry/util/evaluator/__init__.py | 11 +- .../opentelemetry/util/evaluator/deepeval.py | 39 +- .../util/evaluator/deepeval_models.py | 174 +++++++ .../util/evaluator/deepeval_runner.py | 36 +- .../tests/conftest.py | 18 + .../tests/test_deepeval_evaluator.py | 104 +++- .../CHANGELOG.md | 5 + .../README.rst | 37 ++ .../examples/run_circuit_evaluation.py | 202 ++++++++ .../pyproject.toml | 53 ++ .../util/evaluator/circuit_deepeval.py | 387 +++++++++++++++ .../util/evaluator/circuit_version.py | 3 + .../tests/conftest.py | 16 + .../tests/test_circuit_deepeval.py | 130 +++++ 19 files changed, 1727 insertions(+), 36 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py create mode 100644 util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_models.py create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/CHANGELOG.md create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/README.rst create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/examples/run_circuit_evaluation.py create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/pyproject.toml create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_deepeval.py create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_version.py create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/tests/conftest.py create mode 100644 util/opentelemetry-util-genai-evals-splunk-circuit/tests/test_circuit_deepeval.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/README.rst b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/README.rst index acdc724..b619f9b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/README.rst @@ -15,8 +15,14 @@ Prerequisites ------------- * Python 3.10+ -* An OpenAI API key with access to ``gpt-4o-mini`` (or set ``OPENAI_MODEL`` to a - model that is available to your account) +* One of the following LLM credential sets: + + - An OpenAI API key with access to ``gpt-4o-mini`` (or adjust ``OPENAI_MODEL``) + - Cisco CircuIT access with ``CISCO_APP_KEY`` plus either ``CISCO_CIRCUIT_TOKEN`` + or OAuth credentials (``CISCO_CLIENT_ID`` and ``CISCO_CLIENT_SECRET``). Set + ``TRAVEL_LLM_PROVIDER=circuit`` to activate the CircuIT integration. Optional + overrides: ``CIRCUIT_DEFAULT_DEPLOYMENT``, ``CIRCUIT_UPSTREAM_BASE`` and + ``CIRCUIT_TOKEN_CACHE`` * A running OTLP collector (gRPC on ``localhost:4317`` by default) Setup @@ -52,6 +58,42 @@ At the same time it streams OTLP traces. You should see: ``gen_ai.provider.name=openai`` and ``service.name`` derived from ``OTEL_SERVICE_NAME``. +Using Cisco CircuIT +------------------- + +To route the demo through Cisco CircuIT's OpenAI-compatible endpoint: + +.. code-block:: bash + + export TRAVEL_LLM_PROVIDER=circuit + export CISCO_APP_KEY=your-app-key + + # Option 1: provide a static access token + export CISCO_CIRCUIT_TOKEN=token-from-cisco + + # Option 2: let the demo mint tokens via OAuth client credentials + export CISCO_CLIENT_ID=your-client-id + export CISCO_CLIENT_SECRET=your-client-secret + # Optional cache file for minted tokens (defaults to a temp path) + export CIRCUIT_TOKEN_CACHE=/tmp/circuit_travel_demo.json + + # Optional overrides if you are proxying via LiteLLM or a CircuIT shim + export CIRCUIT_API_BASE=http://localhost:4000/v1 + export CIRCUIT_DEFAULT_DEPLOYMENT=gpt-5-nano + + # Optional: force OAuth minting even if a static token is present + export TRAVEL_FORCE_CIRCUIT_OAUTH=1 + + # Optional: disable connection debug prints (default is enabled) + export TRAVEL_DEBUG_CONNECTIONS=0 + + +Run ``python main.py`` after exporting these variables. The example automatically +includes the CircuIT ``appkey`` metadata and refreshes OAuth tokens on demand. +When ``TRAVEL_DEBUG_CONNECTIONS`` is enabled the script prints the token source, +cache location and a redacted preview so you can confirm whether a static value or +an OAuth-minted token is being used. + Tear down --------- diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py new file mode 100644 index 0000000..7e44264 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py @@ -0,0 +1,463 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reusable helpers for connecting LangChain apps to Cisco CircuIT.""" + +from __future__ import annotations + +import importlib +import json +import os +import tempfile +from datetime import datetime, timedelta, timezone +from pathlib import Path +from threading import Lock +from typing import Any, Dict, Iterable, Optional + +_TRUTHY = {"1", "true", "yes", "on"} +_FALSEY = {"0", "false", "no"} +_FORCE_OAUTH_ENV_VARS: tuple[str, ...] = ( + "CIRCUIT_FORCE_OAUTH", + "TRAVEL_FORCE_CIRCUIT_OAUTH", +) +_DEBUG_ENV_VARS: tuple[str, ...] = ( + "CIRCUIT_DEBUG_CONNECTIONS", + "TRAVEL_DEBUG_CONNECTIONS", +) +_CONNECTION_DEBUG_CACHE: set[str] = set() +_CONNECTION_DEBUG_LOCK = Lock() + + +def _truthy_env(name: str) -> bool: + value = os.getenv(name) + if value is None: + return False + return value.strip().lower() in _TRUTHY + + +def _debug_connections_enabled() -> bool: + # Respect the first explicit override; default to enabled for visibility. + for env in _DEBUG_ENV_VARS: + value = os.getenv(env) + if value is not None: + return value.strip().lower() not in _FALSEY + return True + + +_DEBUG_CONNECTIONS_ENABLED = _debug_connections_enabled() + + +def _token_preview(token: Optional[str]) -> str: + if not token: + return "" + length = len(token) + if length <= 8: + return f"{token[:2]}...{token[-2:]} (len={length})" + return f"{token[:4]}...{token[-4:]} (len={length})" + + +def _debug_token_message(message: str, preview: Optional[str] = None) -> None: + if not _DEBUG_CONNECTIONS_ENABLED: + return + if preview: + print(f"[circuit-debug] {message} token={preview}") + else: + print(f"[circuit-debug] {message}") + + +def circuit_provider_enabled(provider_setting: Optional[str] = None) -> bool: + if provider_setting: + return provider_setting.strip().lower() in {"circuit", "splunk-circuit"} + return bool(os.getenv("CISCO_APP_KEY") or os.getenv("CIRCUIT_APP_KEY")) + + +def resolve_model_name( + provider_setting: Optional[str] = None, + *, + default_openai_model: str = "gpt-5-nano", +) -> str: + if circuit_provider_enabled(provider_setting): + return ( + os.getenv("CIRCUIT_DEPLOYMENT") + or os.getenv("CIRCUIT_DEFAULT_DEPLOYMENT") + or os.getenv("OPENAI_MODEL") + or default_openai_model + ) + return os.getenv("OPENAI_MODEL", default_openai_model) + + +def _circuit_token_cache_path(default_filename: str) -> Path | None: + raw = os.getenv("CIRCUIT_TOKEN_CACHE") + if raw is None: + return Path(tempfile.gettempdir()) / default_filename + raw = raw.strip() + if not raw: + return None + return Path(raw).expanduser() + + +def _read_cached_token(cache_path: Path | None) -> Optional[str]: + if not cache_path or not cache_path.exists(): + return None + try: + payload = json.loads(cache_path.read_text("utf-8")) + token = payload.get("access_token") + expires_raw = payload.get("expires_at") + if not token or not expires_raw: + return None + expires_at = datetime.fromisoformat(expires_raw) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=timezone.utc) + if datetime.now(timezone.utc) < expires_at - timedelta(minutes=5): + return token + except Exception: # pragma: no cover - defensive + return None + return None + + +def _write_cached_token(cache_path: Path | None, token: str, expires_in: int) -> None: + if not cache_path: + return + expires_at = datetime.now(timezone.utc) + timedelta(seconds=max(expires_in, 0)) + payload = { + "access_token": token, + "expires_at": expires_at.isoformat(), + } + try: + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_text(json.dumps(payload), encoding="utf-8") + try: + os.chmod(cache_path, 0o600) + except Exception: + pass + except Exception: # pragma: no cover - defensive + _debug_token_message("Unable to persist CircuIT token cache") + + +def _fetch_circuit_token(client_id: str, client_secret: str, token_url: str) -> tuple[str, int]: + try: + requests = importlib.import_module("requests") + except ModuleNotFoundError as exc: # pragma: no cover - dependency guard + raise RuntimeError("requests is required to mint Cisco CircuIT access tokens") from exc + + response = requests.post( + token_url, + data={"grant_type": "client_credentials"}, + headers={ + "Accept": "application/json", + "Content-Type": "application/x-www-form-urlencoded", + }, + auth=(client_id, client_secret), + timeout=30, + ) + response.raise_for_status() + payload = response.json() + token = payload.get("access_token") + if not token: + raise RuntimeError("CircuIT token endpoint did not return access_token") + expires_in = int(payload.get("expires_in", 3600)) + return token, expires_in + + +def _force_oauth() -> bool: + return any(_truthy_env(name) for name in _FORCE_OAUTH_ENV_VARS) + + +def _mint_circuit_token(default_cache_filename: str) -> tuple[str, str, Optional[Path]]: + cache_path = _circuit_token_cache_path(default_cache_filename) + cached = _read_cached_token(cache_path) + if cached: + return cached, "oauth-cache", cache_path + + client_id = os.getenv("CISCO_CLIENT_ID") or os.getenv("CIRCUIT_CLIENT_ID") + client_secret = os.getenv("CISCO_CLIENT_SECRET") or os.getenv("CIRCUIT_CLIENT_SECRET") + if not client_id or not client_secret: + raise RuntimeError("Set CISCO_CIRCUIT_TOKEN or provide Cisco OAuth client credentials") + token_url = os.getenv("CISCO_TOKEN_URL", "https://id.cisco.com/oauth2/default/v1/token") + token, expires_in = _fetch_circuit_token(client_id, client_secret, token_url) + _write_cached_token(cache_path, token, expires_in) + return token, "oauth-fetch", cache_path + + +def _augment_circuit_kwargs( + model: str, + base_kwargs: Dict[str, Any], + *, + default_cache_filename: str, +) -> tuple[Dict[str, Any], Dict[str, Any]]: + kwargs: Dict[str, Any] = dict(base_kwargs) + debug: Dict[str, Any] = {"provider": "circuit", "deployment": model} + + app_key = os.getenv("CISCO_APP_KEY") or os.getenv("CIRCUIT_APP_KEY") + if not app_key: + raise RuntimeError("CISCO_APP_KEY (or CIRCUIT_APP_KEY) must be set when using CircuIT") + + raw_base = kwargs.pop("base_url", None) or os.getenv("CIRCUIT_API_BASE") + if raw_base: + sanitized = raw_base.rstrip("/") + if sanitized.endswith("/v1"): + sanitized = sanitized[: -len("/v1")] + if "/openai/deployments/" not in sanitized: + sanitized = f"{sanitized}/openai/deployments/{model}" + else: + upstream = os.getenv("CIRCUIT_UPSTREAM_BASE", "https://chat-ai.cisco.com").rstrip("/") + sanitized = f"{upstream}/openai/deployments/{model}" + if sanitized.endswith("/chat/completions"): + sanitized = sanitized[: -len("/chat/completions")] + base_url = sanitized + kwargs["base_url"] = base_url + debug["base_url"] = base_url + + force_oauth = _force_oauth() + debug["force_oauth"] = force_oauth + + api_key = kwargs.get("api_key") + token_source_label: Optional[str] = None + ignored_sources: list[str] = [] + + if api_key: + token_source_label = "kwargs" + else: + static_token: Optional[str] = None + static_name: Optional[str] = None + for candidate in ("CISCO_CIRCUIT_TOKEN", "CIRCUIT_ACCESS_TOKEN"): + value = os.getenv(candidate) + if value: + static_token = value + static_name = candidate + break + if static_token and not force_oauth: + api_key = static_token + token_source_label = f"static-env:{static_name}" + else: + if static_name and force_oauth: + ignored_sources.append(static_name) + + cache_path_str = "" + preview = "" + + if not api_key: + api_key, issued_from, cache_path = _mint_circuit_token(default_cache_filename) + token_source_label = issued_from + cache_path_str = str(cache_path) if cache_path else "" + debug["token_cache_path"] = cache_path_str + preview = _token_preview(api_key) + _debug_token_message( + f"minted CircuIT token ({issued_from}) cache={cache_path_str or ''}", + preview, + ) + else: + preview = _token_preview(api_key) + if token_source_label == "kwargs": + _debug_token_message("using CircuIT token supplied via kwargs", preview) + else: + env_name = token_source_label.split(":", 1)[-1] if token_source_label else "env" + _debug_token_message(f"using CircuIT token from {env_name}", preview) + + if ignored_sources: + joined = ",".join(sorted(set(ignored_sources))) + debug["ignored_token_sources"] = joined + _debug_token_message(f"force OAuth enabled so ignoring static token from {joined}") + + debug["token_source"] = token_source_label or "unknown" + debug["token_hint"] = preview + if cache_path_str: + debug["token_cache_path"] = cache_path_str + + kwargs["api_key"] = api_key + + default_headers = dict(kwargs.get("default_headers", {})) + default_headers.setdefault("api-key", api_key) + kwargs["default_headers"] = default_headers + + model_kwargs = dict(kwargs.get("model_kwargs", {})) + user_payload = {"appkey": app_key} + session_id = os.getenv("CIRCUIT_SESSION_ID") + user_id = os.getenv("CIRCUIT_USER_ID") + if session_id: + user_payload["session_id"] = session_id + debug["session_id_present"] = True + else: + debug["session_id_present"] = False + if user_id: + user_payload["user"] = user_id + debug["user_id_present"] = True + else: + debug["user_id_present"] = False + model_kwargs["user"] = json.dumps(user_payload) + kwargs["model_kwargs"] = model_kwargs + + timeout_env = os.getenv("CIRCUIT_TIMEOUT") + if timeout_env and "timeout" not in kwargs: + try: + kwargs["timeout"] = float(timeout_env) + debug["timeout"] = kwargs["timeout"] + except ValueError: + pass + + retries_env = os.getenv("CIRCUIT_MAX_RETRIES") + if retries_env and "max_retries" not in kwargs: + try: + kwargs["max_retries"] = int(retries_env) + debug["max_retries"] = kwargs["max_retries"] + except ValueError: + pass + + return kwargs, debug + + +def resolve_openai_kwargs( + model: str, + *, + provider_setting: Optional[str] = None, + default_cache_filename: str = "circuit_llm_token.json", +) -> tuple[Dict[str, Any], Dict[str, Any]]: + kwargs: Dict[str, Any] = {} + debug: Dict[str, Any] = {"provider": "openai", "model": model} + + base_url_envs: Iterable[str] = ( + "TRAVEL_OPENAI_BASE_URL", + "OPENAI_BASE_URL", + "OPENAI_API_BASE", + ) + for env in base_url_envs: + base_url = os.getenv(env) + if base_url: + base_url = base_url.rstrip("/") + kwargs["base_url"] = base_url + debug["base_url"] = base_url + break + + api_key = os.getenv("TRAVEL_OPENAI_API_KEY") or os.getenv("OPENAI_API_KEY") + if api_key: + kwargs["api_key"] = api_key + debug["api_key_present"] = True + + organization = os.getenv("OPENAI_ORG_ID") or os.getenv("OPENAI_ORGANIZATION") + if organization: + kwargs["organization"] = organization + debug["organization_present"] = True + + if circuit_provider_enabled(provider_setting): + kwargs, circuit_debug = _augment_circuit_kwargs( + model, + kwargs, + default_cache_filename=default_cache_filename, + ) + debug.update(circuit_debug) + else: + # Remove OpenAI API key when CircuIT is not selected to avoid leaking hints. + if circuit_provider_enabled(None): + kwargs.pop("api_key", None) + debug.pop("api_key_present", None) + debug["ignored_openai_api_key"] = True + debug["provider"] = "openai" + + return kwargs, debug + + +def log_connection_target(agent_name: str, model: str, debug: Dict[str, Any]) -> None: + if not _DEBUG_CONNECTIONS_ENABLED: + return + provider = debug.get("provider", "openai") + base_url = debug.get("base_url") or "" + token_source = debug.get("token_source") or ( + "api-key" if debug.get("api_key_present") else "env-default" + ) + key = f"{provider}|{base_url}|{model}" + with _CONNECTION_DEBUG_LOCK: + if key in _CONNECTION_DEBUG_CACHE: + return + _CONNECTION_DEBUG_CACHE.add(key) + parts = [ + f"provider={provider}", + f"model={model}", + f"base_url={base_url}", + f"token_source={token_source}", + f"agent={agent_name}", + ] + if debug.get("session_id_present") is not None: + parts.append(f"session_flag={'1' if debug.get('session_id_present') else '0'}") + if debug.get("user_id_present") is not None: + parts.append(f"user_flag={'1' if debug.get('user_id_present') else '0'}") + token_hint = debug.get("token_hint") + if token_hint: + parts.append(f"token_hint={token_hint}") + cache_path = debug.get("token_cache_path") + if cache_path: + parts.append(f"cache={cache_path}") + if debug.get("force_oauth"): + parts.append("force_oauth=1") + ignored = debug.get("ignored_token_sources") + if ignored: + parts.append(f"ignored={ignored}") + print("[circuit-debug] " + " ".join(parts)) + + +def create_chat_openai( + agent_name: str, + *, + session_id: Optional[str] = None, + temperature: float = 0.0, + provider_setting: Optional[str] = None, + default_openai_model: str = "gpt-5-nano", + default_cache_filename: str = "circuit_llm_token.json", + tags: Optional[list[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + model_override: Optional[str] = None, +) -> Any: + try: + module = importlib.import_module("langchain_openai") + ChatOpenAI = getattr(module, "ChatOpenAI") + except Exception as exc: # pragma: no cover - dependency guard + raise RuntimeError( + "langchain_openai.ChatOpenAI is required for this demo" + ) from exc + + model = model_override or resolve_model_name( + provider_setting, default_openai_model=default_openai_model + ) + kwargs, debug = resolve_openai_kwargs( + model, + provider_setting=provider_setting, + default_cache_filename=default_cache_filename, + ) + log_connection_target(agent_name, model, debug) + + final_tags = list(tags) if tags is not None else [] + if f"agent:{agent_name}" not in final_tags: + final_tags.append(f"agent:{agent_name}") + + llm_metadata: Dict[str, Any] = dict(metadata or {}) + llm_metadata.setdefault("agent_name", agent_name) + if session_id is not None: + llm_metadata.setdefault("session_id", session_id) + llm_metadata.setdefault("thread_id", session_id) + + return ChatOpenAI( + model=model, + temperature=temperature, + tags=final_tags, + metadata=llm_metadata, + **kwargs, + ) + + +__all__ = [ + "circuit_provider_enabled", + "resolve_model_name", + "resolve_openai_kwargs", + "log_connection_target", + "create_chat_openai", +] diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py index ac65cc0..358080d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py @@ -24,11 +24,11 @@ """ import argparse +import json import os import random -import json -from datetime import datetime, timedelta import time +from datetime import datetime, timedelta from typing import Annotated, Dict, List, Optional, TypedDict from uuid import uuid4 @@ -48,6 +48,8 @@ create_agent as _create_react_agent, # type: ignore[attr-defined] ) +from circuit_support import create_chat_openai, resolve_model_name + # --------------------------------------------------------------------------- # Sample data utilities @@ -172,13 +174,19 @@ class PlannerState(TypedDict): poison_events: List[str] -def _model_name() -> str: - return os.getenv("OPENAI_MODEL", "gpt-5-nano") +class PoisonConfig(TypedDict): + prob: float + types: List[str] + max: int + +def _provider_setting() -> str: + return os.getenv("TRAVEL_LLM_PROVIDER", "").strip().lower() def _create_llm(agent_name: str, *, temperature: float, session_id: str) -> ChatOpenAI: """Create an LLM instance decorated with tags/metadata for tracing.""" - model = _model_name() + provider_setting = _provider_setting() + model = resolve_model_name(provider_setting, default_openai_model="gpt-5-nano") tags = [f"agent:{agent_name}", "travel-planner"] metadata = { "agent_name": agent_name, @@ -188,11 +196,16 @@ def _create_llm(agent_name: str, *, temperature: float, session_id: str) -> Chat "ls_model_name": model, "ls_temperature": temperature, } - return ChatOpenAI( - model=model, + return create_chat_openai( + agent_name, + session_id=session_id, temperature=temperature, + provider_setting=provider_setting, + default_openai_model="gpt-5-nano", + default_cache_filename="circuit_travel_demo_token.json", tags=tags, metadata=metadata, + model_override=model, ) @@ -201,7 +214,7 @@ def _create_llm(agent_name: str, *, temperature: float, session_id: str) -> Chat # --------------------------------------------------------------------------- -def _poison_config() -> Dict[str, object]: +def _poison_config() -> PoisonConfig: """Read environment variables controlling prompt poisoning. TRAVEL_POISON_PROB: Base probability (0-1) that a given agent step is poisoned. @@ -233,8 +246,9 @@ def _poison_config() -> Dict[str, object]: random.seed(int(seed)) except ValueError: random.seed(seed) + bounded_prob = max(0.0, min(prob, 1.0)) return { - "prob": max(0.0, min(prob, 1.0)), + "prob": bounded_prob, "types": types, "max": max_snippets, } @@ -282,7 +296,7 @@ def maybe_add_quality_noise( if random.random() > cfg["prob"]: return base_prompt # choose subset - available = cfg["types"] + available = list(cfg["types"]) random.shuffle(available) count = random.randint(1, min(cfg["max"], len(available))) chosen = available[:count] diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt index 186e370..220385a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt @@ -3,3 +3,4 @@ langchain-openai>=1.0.0 langgraph>=1.0.0 python-dotenv>=1.0.0 deepeval +requests>=2.31.0 diff --git a/util/opentelemetry-util-genai-evals-deepeval/README.rst b/util/opentelemetry-util-genai-evals-deepeval/README.rst index 87682cc..c263460 100644 --- a/util/opentelemetry-util-genai-evals-deepeval/README.rst +++ b/util/opentelemetry-util-genai-evals-deepeval/README.rst @@ -29,6 +29,10 @@ Requirements To override the model, set ``DEEPEVAL_EVALUATION_MODEL`` (or ``DEEPEVAL_MODEL`` / ``OPENAI_MODEL``) to a different deployment along with the corresponding provider credentials. + Custom Deepeval evaluation models can be contributed by external packages via + the ``opentelemetry_util_genai_evals.deepeval_models`` entry-point group. When a + package registers a model under a specific name you can select it by setting + ``DEEPEVAL_MODEL`` to that name. * (Optional) ``DEEPEVAL_API_KEY`` if your Deepeval account requires it. Configuration diff --git a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/__init__.py b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/__init__.py index 6899628..355b611 100644 --- a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/__init__.py +++ b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/__init__.py @@ -12,7 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Deepeval evaluator plugin package.""" +"""Evaluator plugin package. + +The module exposes the built-in Deepeval evaluator and extends the package +search path so additional evaluator helpers (for example custom Deepeval +models) can live in separate distributions under the same namespace. +""" + +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) # type: ignore[name-defined] from .deepeval import DeepevalEvaluator, register, registration diff --git a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval.py b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval.py index 891b16c..8725735 100644 --- a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval.py +++ b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval.py @@ -15,9 +15,11 @@ from __future__ import annotations +import importlib import logging import os import re as _re +import sys from collections.abc import Mapping as MappingABC from dataclasses import dataclass from typing import Any, Iterable, Mapping, Sequence @@ -47,6 +49,7 @@ from .deepeval_metrics import ( instantiate_metrics as _instantiate_metrics, ) +from .deepeval_models import resolve_model as _resolve_model from .deepeval_runner import run_evaluation as _run_deepeval try: # Optional debug logging import @@ -364,6 +367,17 @@ def _evaluate_generic( ) except Exception: # pragma: no cover pass + + module = sys.modules.get("deepeval") + if module is None: + try: + module = importlib.import_module("deepeval") + except Exception: + module = None + if module is None: + return self._error_results( + "Deepeval dependency is not available", ModuleNotFoundError + ) metric_specs = self._build_metric_specs() if not metric_specs: genai_debug_log( @@ -413,18 +427,28 @@ def _evaluate_generic( "GENAI_OPENAI_API_KEY" ) api_key = candidate or env_key + base_url = os.getenv("OPENAI_API_BASE") or os.getenv( + "GENAI_OPENAI_API_BASE" + ) if api_key: - # Attempt to configure Deepeval/OpenAI client. try: # pragma: no cover - external dependency - # Support legacy openai<1 and new openai>=1 semantics. if not getattr(openai, "api_key", None): # type: ignore[attr-defined] try: - setattr(openai, "api_key", api_key) # legacy style - except Exception: # pragma: no cover + setattr(openai, "api_key", api_key) + except Exception: pass - # Ensure env var set for client() style usage. if not os.getenv("OPENAI_API_KEY"): os.environ["OPENAI_API_KEY"] = api_key + if base_url: + try: + setattr(openai, "base_url", base_url) + except Exception: + pass + try: + setattr(openai, "api_base", base_url) + except Exception: + os.environ.setdefault("OPENAI_API_BASE", base_url) + os.environ.setdefault("OPENAI_BASE_URL", base_url) except Exception: pass except Exception: # pragma: no cover - defensive @@ -614,13 +638,16 @@ def _coerce_option(value: Any) -> Any: # per-metric param check handled in deepeval_metrics @staticmethod - def _default_model() -> str | None: + def _default_model() -> Any: model = ( os.getenv("DEEPEVAL_EVALUATION_MODEL") or os.getenv("DEEPEVAL_MODEL") or os.getenv("OPENAI_MODEL") ) if model: + custom_model = _resolve_model(model) + if custom_model is not None: + return custom_model return model return "gpt-4o-mini" diff --git a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_models.py b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_models.py new file mode 100644 index 0000000..5d7bdce --- /dev/null +++ b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_models.py @@ -0,0 +1,174 @@ +# Copyright The OpenTelemetry Authors +# Licensed under the Apache License, Version 2.0 + +"""Plugin registry for Deepeval evaluation models. + +This module exposes a lightweight registry that allows additional packages to +register :class:`deepeval.models.base_model.DeepEvalBaseLLM` factories. The +registry is populated from two sources: + +* Direct calls to :func:`register_model` (used internally and by tests) +* Python entry points declared under the group + ``opentelemetry_util_genai_evals.deepeval_models`` + +Each entry point name becomes the lookup key. The value must be a callable that +returns a ``DeepEvalBaseLLM`` instance when invoked with no arguments. The +registry caches instantiated models to avoid repeated token provisioning while +ensuring a fresh model can be created on demand when required. +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from importlib import metadata +from threading import RLock +from typing import Any + +try: # Deepeval is an optional dependency; only import when available. + from deepeval.models.base_model import DeepEvalBaseLLM # type: ignore +except Exception: # pragma: no cover - dependency missing during tests + DeepEvalBaseLLM = type("DeepEvalBaseLLM", (), {}) # type: ignore[misc] + + +_LOGGER = logging.getLogger(__name__) + +_ENTRYPOINT_GROUP = "opentelemetry_util_genai_evals.deepeval_models" + +_MODEL_FACTORIES: dict[str, Callable[[], DeepEvalBaseLLM | None]] = {} +_MODEL_CACHE: dict[str, DeepEvalBaseLLM] = {} +_ENTRYPOINTS_LOADED = False +_LOCK = RLock() + + +def _normalized(name: str) -> str: + return name.strip().lower() + + +def register_model( + name: str, factory: Callable[[], DeepEvalBaseLLM | None] +) -> None: + """Register a Deepeval evaluation model factory. + + Parameters + ---------- + name: + The key that end users reference via ``DEEPEVAL_MODEL`` or metric-level + ``model`` overrides. Normalized to lowercase. + factory: + Zero-argument callable returning a ``DeepEvalBaseLLM`` instance (or + ``None`` to indicate registration failure). + """ + + if not name: + raise ValueError("Model name must be a non-empty string") + if not callable(factory): + raise TypeError("Model factory must be callable") + key = _normalized(name) + with _LOCK: + _MODEL_FACTORIES[key] = factory + _MODEL_CACHE.pop(key, None) + + +def _load_entrypoints() -> None: + global _ENTRYPOINTS_LOADED + if _ENTRYPOINTS_LOADED: + return + with _LOCK: + if _ENTRYPOINTS_LOADED: + return + eps: list[Any] + try: + eps_obj = metadata.entry_points() + if hasattr(eps_obj, "select"): + eps = list(eps_obj.select(group=_ENTRYPOINT_GROUP)) # type: ignore[arg-type] + else: # pragma: no cover - legacy structure + eps = list(eps_obj.get(_ENTRYPOINT_GROUP, [])) # type: ignore[attr-defined] + except Exception as exc: # pragma: no cover - defensive + _LOGGER.debug( + "Failed to enumerate Deepeval model entry points: %s", exc + ) + eps = [] + for ep in eps: + try: + obj = ep.load() + except Exception as exc: # pragma: no cover - defensive + _LOGGER.warning( + "Skipping Deepeval model entry point '%s': %s", + ep.name, + exc, + ) + continue + if not callable(obj): + _LOGGER.warning( + "Entry point '%s' does not provide a callable factory", + ep.name, + ) + continue + try: + register_model(ep.name, obj) + except Exception as exc: # pragma: no cover - defensive + _LOGGER.warning( + "Failed to register Deepeval model '%s': %s", ep.name, exc + ) + _ENTRYPOINTS_LOADED = True + + +def resolve_model(name: str) -> DeepEvalBaseLLM | None: + """Return an instance of the registered model if available.""" + + if not name: + return None + _load_entrypoints() + key = _normalized(name) + factory: Callable[[], DeepEvalBaseLLM | None] | None + with _LOCK: + if key in _MODEL_CACHE: + return _MODEL_CACHE[key] + factory = _MODEL_FACTORIES.get(key) + if factory is None: + return None + try: + instance = factory() + except Exception as exc: # pragma: no cover - defensive + _LOGGER.warning( + "Model factory for '%s' raised an exception: %s", key, exc + ) + return None + if instance is None: + return None + if not isinstance(instance, DeepEvalBaseLLM): + _LOGGER.warning( + "Model factory for '%s' did not return a DeepEvalBaseLLM instance", + key, + ) + return None + with _LOCK: + _MODEL_CACHE[key] = instance + return instance + + +def list_models() -> list[str]: + """Return the list of currently registered model keys.""" + + _load_entrypoints() + with _LOCK: + return sorted(_MODEL_FACTORIES) + + +def clear_models() -> None: + """Reset the registry (used primarily in tests).""" + + global _ENTRYPOINTS_LOADED + with _LOCK: + _MODEL_FACTORIES.clear() + _MODEL_CACHE.clear() + _ENTRYPOINTS_LOADED = False + + +__all__ = [ + "register_model", + "resolve_model", + "list_models", + "clear_models", +] diff --git a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_runner.py b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_runner.py index 6e2534d..67a51ee 100644 --- a/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_runner.py +++ b/util/opentelemetry-util-genai-evals-deepeval/src/opentelemetry/util/evaluator/deepeval_runner.py @@ -5,10 +5,23 @@ import io from contextlib import redirect_stderr, redirect_stdout -from typing import Any, Callable, Sequence +from importlib import import_module +from inspect import signature +from typing import Any, Callable, Dict, Sequence -from deepeval import evaluate as deepeval_evaluate -from deepeval.evaluate.configs import AsyncConfig, DisplayConfig +from deepeval import evaluate as deepeval_evaluate # type: ignore[import] + +_configs_module = import_module("deepeval.evaluate.configs") +AsyncConfig = getattr(_configs_module, "AsyncConfig") +DisplayConfig = getattr(_configs_module, "DisplayConfig") +CacheConfig = getattr(_configs_module, "CacheConfig", None) + +_evaluate_params = set(signature(deepeval_evaluate).parameters) +_supports_async_config = "async_config" in _evaluate_params +_supports_display_config = "display_config" in _evaluate_params +_supports_cache_config = ( + "cache_config" in _evaluate_params and CacheConfig is not None +) def run_evaluation( @@ -16,16 +29,25 @@ def run_evaluation( metrics: Sequence[Any], debug_log: Callable[..., None] | None = None, ) -> Any: - display_config = DisplayConfig(show_indicator=False, print_results=False) - async_config = AsyncConfig(run_async=False) + call_kwargs: Dict[str, Any] = {} + if _supports_display_config: + display_config = DisplayConfig( + show_indicator=False, print_results=False + ) + call_kwargs["display_config"] = display_config + if _supports_async_config: + async_config = AsyncConfig(run_async=False) + call_kwargs["async_config"] = async_config + if _supports_cache_config and CacheConfig is not None: + cache_config = CacheConfig(write_cache=False, use_cache=False) + call_kwargs["cache_config"] = cache_config stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): result = deepeval_evaluate( [test_case], list(metrics), - async_config=async_config, - display_config=display_config, + **call_kwargs, ) if debug_log is not None: out = stdout_buffer.getvalue().strip() diff --git a/util/opentelemetry-util-genai-evals-deepeval/tests/conftest.py b/util/opentelemetry-util-genai-evals-deepeval/tests/conftest.py index cc25806..4fbb84c 100644 --- a/util/opentelemetry-util-genai-evals-deepeval/tests/conftest.py +++ b/util/opentelemetry-util-genai-evals-deepeval/tests/conftest.py @@ -2,6 +2,24 @@ import sys from pathlib import Path +import pytest # type: ignore[import] + _src = Path(__file__).resolve().parents[1] / "src" if str(_src) not in sys.path: sys.path.insert(0, str(_src)) + + +@pytest.fixture(autouse=True) +def _patch_entry_points(monkeypatch): + # Avoid enumerating full environment entry points during tests, which can be slow. + class _EmptyEntryPoints(list): + def select(self, **kwargs): # type: ignore[override] + return [] + + empty_eps = _EmptyEntryPoints() + + monkeypatch.setattr( + "opentelemetry.util.evaluator.deepeval_models.metadata.entry_points", + lambda: empty_eps, + ) + yield diff --git a/util/opentelemetry-util-genai-evals-deepeval/tests/test_deepeval_evaluator.py b/util/opentelemetry-util-genai-evals-deepeval/tests/test_deepeval_evaluator.py index cd07fa0..bdbc420 100644 --- a/util/opentelemetry-util-genai-evals-deepeval/tests/test_deepeval_evaluator.py +++ b/util/opentelemetry-util-genai-evals-deepeval/tests/test_deepeval_evaluator.py @@ -8,6 +8,7 @@ # ruff: noqa: E402 import importlib +import os import sys from unittest.mock import patch @@ -16,21 +17,26 @@ # Provide stub 'deepeval' package structure if dependency is unavailable. def _install_deepeval_stubs(): - if "deepeval" in sys.modules: - return - try: - import importlib as _il # noqa: F401 - - __import__("deepeval") # pragma: no cover - return - except Exception: - pass + if os.getenv("OTEL_TEST_USE_REAL_DEEPEVAL") == "1": + try: + import importlib as _il # noqa: F401 + + __import__("deepeval") # pragma: no cover + return + except Exception: + pass + # Ensure any previously imported real modules are cleared so our stubs take effect. + for name in list(sys.modules): + if name == "deepeval" or name.startswith("deepeval."): + sys.modules.pop(name, None) import types root = types.ModuleType("deepeval") metrics_mod = types.ModuleType("deepeval.metrics") test_case_mod = types.ModuleType("deepeval.test_case") eval_cfg_mod = types.ModuleType("deepeval.evaluate.configs") + models_root_mod = types.ModuleType("deepeval.models") + models_base_mod = types.ModuleType("deepeval.models.base_model") class _ReqParam: def __init__(self, value): @@ -80,6 +86,22 @@ def __init__(self, **kwargs): metrics_mod.AnswerRelevancyMetric = AnswerRelevancyMetric metrics_mod.FaithfulnessMetric = FaithfulnessMetric + class DeepEvalBaseLLM: # minimal behaviour for registry tests + def load_model(self): + return self + + def generate(self, prompt: str): # pragma: no cover - unused + raise NotImplementedError + + async def a_generate(self, prompt: str): # pragma: no cover - unused + raise NotImplementedError + + def get_model_name(self): # pragma: no cover - unused + return "stub" + + models_base_mod.DeepEvalBaseLLM = DeepEvalBaseLLM + models_root_mod.base_model = models_base_mod + class LLMTestCaseParams: INPUT_OUTPUT = "io" INPUT = "input" @@ -124,6 +146,8 @@ class _Eval: sys.modules["deepeval.test_case"] = test_case_mod sys.modules["deepeval.evaluate"] = root # simplify sys.modules["deepeval.evaluate.configs"] = eval_cfg_mod + sys.modules["deepeval.models"] = models_root_mod + sys.modules["deepeval.models.base_model"] = models_base_mod _install_deepeval_stubs() @@ -179,6 +203,12 @@ def __init__(self, *, test_results: list[TestResult], confident_link=None): from opentelemetry.util.evaluator import deepeval as plugin +from opentelemetry.util.evaluator.deepeval_models import ( + clear_models as clear_model_registry, +) +from opentelemetry.util.evaluator.deepeval_models import ( + register_model, +) from opentelemetry.util.genai.evals.registry import ( clear_registry, get_evaluator, @@ -197,8 +227,10 @@ def _reset_registry(): clear_registry() importlib.reload(plugin) plugin.register() + clear_model_registry() yield clear_registry() + clear_model_registry() def _build_invocation() -> LLMInvocation: @@ -348,7 +380,59 @@ def boom(specs, test_case, model): results = evaluator.evaluate(invocation) assert len(results) == 1 assert results[0].error is not None - assert "boom" in results[0].error.message + + +def test_custom_deepeval_model_registry(monkeypatch): + import sys + + models_mod = sys.modules["deepeval.models.base_model"] + + class DummyModel(models_mod.DeepEvalBaseLLM): # type: ignore[attr-defined] + def __init__(self): + self._loaded = False + + def load_model(self): + self._loaded = True + return self + + def generate(self, prompt: str) -> str: + self.load_model() + return "stubbed" + + async def a_generate( + self, prompt: str + ) -> str: # pragma: no cover - sync used + return self.generate(prompt) + + def get_model_name(self) -> str: + return "dummy" + + register_model("custom-circuit", DummyModel) + + invocation = _build_invocation() + evaluator = plugin.DeepevalEvaluator( + ("bias",), invocation_type="LLMInvocation" + ) + + captured = {} + + def fake_instantiate(specs, test_case, model): + captured["model"] = model + return [object()], [] + + monkeypatch.setenv("DEEPEVAL_MODEL", "custom-circuit") + monkeypatch.setattr( + "opentelemetry.util.evaluator.deepeval._instantiate_metrics", + fake_instantiate, + ) + monkeypatch.setattr( + "opentelemetry.util.evaluator.deepeval._run_deepeval", + lambda case, metrics, debug_log: DeeEvaluationResult(test_results=[]), + ) + + evaluator.evaluate(invocation) + assert "model" in captured + assert isinstance(captured["model"], DummyModel) def test_evaluator_missing_output(monkeypatch): diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/CHANGELOG.md b/util/opentelemetry-util-genai-evals-splunk-circuit/CHANGELOG.md new file mode 100644 index 0000000..9d49cc3 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## Unreleased + +- Initial release providing the Cisco CircuIT Deepeval evaluation model. diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/README.rst b/util/opentelemetry-util-genai-evals-splunk-circuit/README.rst new file mode 100644 index 0000000..c00d4df --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/README.rst @@ -0,0 +1,37 @@ +splunk-otel-util-genai-evals-splunk-circuit +=========================================== + +This package adds a Cisco CircuIT evaluation model that can be used with +``splunk-otel-genai-evals-deepeval``. It registers a custom Deepeval model under +both ``splunk-circuit`` and ``circuit`` so it can be selected by setting the +``DEEPEVAL_MODEL`` environment variable. + +Quick start +----------- + +1. Install the package alongside the base Deepeval integration:: + + pip install splunk-otel-genai-evals-deepeval splunk-otel-util-genai-evals-splunk-circuit + +2. Configure credentials using the same environment variables as the local + CircuIT shim:: + + export CISCO_CLIENT_ID=... + export CISCO_CLIENT_SECRET=... + export CISCO_APP_KEY=... + export DEEPEVAL_MODEL=splunk-circuit + + Optional overrides: + + * ``CIRCUIT_UPSTREAM_BASE`` - base URL for the CircuIT API + * ``CISCO_TOKEN_URL`` - OAuth token endpoint (default + ``https://id.cisco.com/oauth2/default/v1/token``) + * ``CIRCUIT_TOKEN_CACHE`` - path for cached access tokens + * ``CIRCUIT_DEFAULT_DEPLOYMENT`` - default deployment/model name used for + evaluations + * ``CISCO_CIRCUIT_TOKEN`` - supply a pre-minted token instead of client + credentials + +3. Run evaluations as usual. The Deepeval integration will automatically create + the CircuIT evaluation model and use it to score metrics. + diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/examples/run_circuit_evaluation.py b/util/opentelemetry-util-genai-evals-splunk-circuit/examples/run_circuit_evaluation.py new file mode 100644 index 0000000..3754d41 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/examples/run_circuit_evaluation.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +# pyright: ignore +"""Run a Deepeval assessment using Cisco CircuIT as the judge model.""" + +from __future__ import annotations + +import argparse +import importlib +import os +import sys +from typing import Any, Callable, Iterable, cast + + +def _load_dependency(module: str, attr: str | None = None): + try: + mod = importlib.import_module(module) + except ImportError as exc: # pragma: no cover - runtime guard + raise SystemExit( + "This example requires splunk-otel-genai-evals-deepeval and the CircuIT package on PYTHONPATH." + ) from exc + if attr is None: + return mod + try: + return getattr(mod, attr) + except AttributeError as exc: # pragma: no cover - defensive + raise SystemExit( + f"Module '{module}' is missing expected attribute '{attr}'." + ) from exc + + +create_circuit_llm = cast( + Callable[[], Any], + _load_dependency( + "opentelemetry.util.evaluator.circuit_deepeval", "create_circuit_llm" + ), +) +DeepevalEvaluator = cast( + Any, + _load_dependency( + "opentelemetry.util.evaluator.deepeval", "DeepevalEvaluator" + ), +) +register_model = cast( + Callable[[str, Callable[[], Any]], None], + _load_dependency( + "opentelemetry.util.evaluator.deepeval_models", "register_model" + ), +) +_types_mod = _load_dependency("opentelemetry.util.genai.types") +InputMessage = getattr(_types_mod, "InputMessage") +LLMInvocation = getattr(_types_mod, "LLMInvocation") +OutputMessage = getattr(_types_mod, "OutputMessage") +Text = getattr(_types_mod, "Text") + + +def _ensure_circuit_registration(model_alias: str) -> None: + """Register CircuIT aliases and set the active Deepeval model.""" + + previous = os.environ.get("DEEPEVAL_MODEL") + os.environ["DEEPEVAL_MODEL"] = model_alias + if previous != model_alias: + print(f"Set DEEPEVAL_MODEL to '{model_alias}' (was {previous!r})") + else: + print(f"Using existing DEEPEVAL_MODEL='{model_alias}'") + + for alias in ("splunk-circuit", "circuit"): + register_model(alias, create_circuit_llm) + print("Registered CircuIT model aliases: splunk-circuit, circuit") + + +def _build_invocation(prompt: str, response: str, model_name: str): + invocation = LLMInvocation(request_model=model_name) + invocation.input_messages.append( + InputMessage(role="user", parts=[Text(content=prompt)]) + ) + invocation.output_messages.append( + OutputMessage( + role="assistant", + parts=[Text(content=response)], + finish_reason="stop", + ) + ) + return invocation + + +def _format_metrics( + results: Iterable[Any], show_details: bool = False +) -> Iterable[str]: + for result in results: + label = result.label or "-" + score = "-" if result.score is None else f"{result.score:.3f}" + status = result.attributes.get("gen_ai.evaluation.passed") + status_text = "?" if status is None else ("pass" if status else "fail") + line = f"{result.metric_name:20s} score={score:>6s} label={label:>10s} status={status_text}" + if show_details: + explanation = getattr(result, "explanation", None) + error = getattr(result, "error", None) + if explanation: + line += f" explanation={explanation!r}" + if error: + line += f" error={getattr(error, 'message', error)!r}" + yield line + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--prompt", + default="Summarize the assistant reply.", + help="User prompt text supplied to the LLM under evaluation.", + ) + parser.add_argument( + "--response", + default="The assistant responded with this placeholder answer.", + help="Assistant response text to evaluate.", + ) + parser.add_argument( + "--metrics", + nargs="+", + default=None, + help="Optional list of Deepeval metrics to run (defaults to plugin defaults).", + ) + parser.add_argument( + "--model", + default=None, + help="Deepeval model alias to use (default: splunk-circuit).", + ) + parser.add_argument( + "--show-errors", + action="store_true", + help="Include Deepeval error/explanation details in the output table.", + ) + args = parser.parse_args() + + desired_model = ( + args.model or os.getenv("DEEPEVAL_MODEL") or "splunk-circuit" + ) + # Remove conflicting overrides so our CircuIT model is respected. + if args.model: + os.environ.pop("DEEPEVAL_EVALUATION_MODEL", None) + if os.getenv("OPENAI_MODEL") and desired_model in { + "splunk-circuit", + "circuit", + }: + print( + "OPENAI_MODEL is set; ignoring it in favour of CircuIT model alias." + ) + os.environ.pop("OPENAI_MODEL", None) + + _ensure_circuit_registration(desired_model) + + model_name = os.getenv("DEEPEVAL_MODEL", desired_model) + invocation = _build_invocation(args.prompt, args.response, model_name) + + evaluator = DeepevalEvaluator( + tuple(args.metrics) if args.metrics else None, + invocation_type="LLMInvocation", + ) + resolved_model = None + try: + resolved_model = evaluator._default_model() + except Exception: + pass + if resolved_model is not None: + resolved_desc = getattr( + resolved_model, "get_model_name", lambda: repr(resolved_model) + )() + print( + f"Resolved Deepeval judge: {resolved_desc} ({resolved_model.__class__.__name__})" + ) + print( + f"Invoking Deepeval with model '{model_name}' and metrics {evaluator.metrics}" + ) + try: + results = list(evaluator.evaluate(invocation)) + except Exception as exc: + print( + f"Deepeval evaluation raised an exception: {exc}", file=sys.stderr + ) + return 2 + + if not results: + print( + "No evaluation results were produced. Check your environment settings.", + file=sys.stderr, + ) + return 1 + + print("Deepeval results via Cisco CircuIT:") + for line in _format_metrics(results, show_details=args.show_errors): + print(" " + line) + error_count = sum(1 for item in results if getattr(item, "error", None)) + if error_count: + print( + f"Encountered errors for {error_count} metric(s); inspect the logs above for details." + ) + print("Evaluation complete") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/pyproject.toml b/util/opentelemetry-util-genai-evals-splunk-circuit/pyproject.toml new file mode 100644 index 0000000..0ef31b7 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "splunk-otel-util-genai-evals-splunk-circuit" +dynamic = ["version"] +description = "Cisco CircuIT evaluation model for splunk-otel-genai-evals" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "splunk-otel-genai-evals-deepeval>=0.1.0", + "requests>=2.31", +] + +[project.entry-points."opentelemetry_util_genai_evals.deepeval_models"] +"splunk-circuit" = "opentelemetry.util.evaluator.circuit_deepeval:create_circuit_llm" +"circuit" = "opentelemetry.util.evaluator.circuit_deepeval:create_circuit_llm" + +[project.optional-dependencies] +test = ["pytest>=7.0.0"] + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/util/evaluator/circuit_version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_deepeval.py b/util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_deepeval.py new file mode 100644 index 0000000..5765e29 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_deepeval.py @@ -0,0 +1,387 @@ +"""DeepEval evaluation model for Cisco CircuIT.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import tempfile +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from threading import Lock +from typing import Any + +import requests +from deepeval.models.base_model import DeepEvalBaseLLM +from requests.auth import HTTPBasicAuth + +LOGGER = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class _CircuitConfig: + base_url: str + deployment: str + app_key: str + client_id: str | None + client_secret: str | None + token_url: str + token_cache: Path | None + static_token: str | None + session_id: str | None + user_id: str | None + timeout: float + temperature: float | None + max_tokens: int | None + system_prompt: str | None + + +class _CiscoCircuitTokenManager: + """Cache-aware OAuth client credentials manager for CircuIT.""" + + def __init__( + self, + client_id: str, + client_secret: str, + token_url: str, + cache_path: Path | None, + ) -> None: + self._client_id = client_id + self._client_secret = client_secret + self._token_url = token_url + self._cache_path = cache_path + self._lock = Lock() + + def _read_cache(self) -> str | None: + if not self._cache_path or not self._cache_path.exists(): + return None + try: + payload = json.loads(self._cache_path.read_text("utf-8")) + expires_at = datetime.fromisoformat(payload["expires_at"]).replace( + tzinfo=timezone.utc + ) + if datetime.now(timezone.utc) < expires_at - timedelta(minutes=5): + return payload["access_token"] + except Exception: # pragma: no cover - defensive + return None + return None + + def _write_cache(self, token: str, expires_in: int) -> None: + if not self._cache_path: + return + expires_at = datetime.now(timezone.utc) + timedelta( + seconds=max(expires_in, 0) + ) + try: + self._cache_path.parent.mkdir(parents=True, exist_ok=True) + self._cache_path.write_text( + json.dumps( + { + "access_token": token, + "expires_at": expires_at.isoformat(), + } + ), + encoding="utf-8", + ) + os.chmod(self._cache_path, 0o600) + except Exception: # pragma: no cover - defensive + LOGGER.debug( + "Unable to persist CircuIT token cache", exc_info=True + ) + + def _fetch_new_token(self) -> tuple[str, int]: + auth = HTTPBasicAuth(self._client_id, self._client_secret) + response = requests.post( + self._token_url, + headers={"Accept": "application/json"}, + data={"grant_type": "client_credentials"}, + auth=auth, + timeout=30, + ) + response.raise_for_status() + data = response.json() + token = data.get("access_token") + if not token: + raise RuntimeError( + "CircuIT token endpoint did not return an access_token" + ) + expires_in = int(data.get("expires_in", 3600)) + self._write_cache(token, expires_in) + return token, expires_in + + def get_token(self, force_refresh: bool = False) -> str: + with self._lock: + if not force_refresh: + cached = self._read_cache() + if cached: + return cached + token, _ = self._fetch_new_token() + return token + + +def _load_config() -> _CircuitConfig: + base_url = os.getenv( + "CIRCUIT_UPSTREAM_BASE", "https://chat-ai.cisco.com" + ).rstrip("/") + deployment = ( + os.getenv("DEEPEVAL_CIRCUIT_DEPLOYMENT") + or os.getenv("CIRCUIT_DEFAULT_DEPLOYMENT") + or "gpt-4o-mini" + ) + app_key = os.getenv("CISCO_APP_KEY") or os.getenv("CIRCUIT_APP_KEY") + if not app_key: + raise RuntimeError( + "CISCO_APP_KEY environment variable is required for CircuIT evaluation" + ) + + client_id = os.getenv("CISCO_CLIENT_ID") + client_secret = os.getenv("CISCO_CLIENT_SECRET") + static_token = os.getenv("CISCO_CIRCUIT_TOKEN") or os.getenv( + "CIRCUIT_ACCESS_TOKEN" + ) + token_url = os.getenv( + "CISCO_TOKEN_URL", "https://id.cisco.com/oauth2/default/v1/token" + ) + token_cache_env = os.getenv("CIRCUIT_TOKEN_CACHE") + if token_cache_env: + token_cache = Path(token_cache_env).expanduser() + else: + token_cache = Path(tempfile.gettempdir()) / "circuit_eval_token.json" + + if not static_token and (not client_id or not client_secret): + raise RuntimeError( + "CircuIT evaluation requires either CISCO_CIRCUIT_TOKEN or both CISCO_CLIENT_ID and CISCO_CLIENT_SECRET" + ) + + session_id = os.getenv("CIRCUIT_SESSION_ID") + user_id = os.getenv("CIRCUIT_USER_ID") + timeout = float(os.getenv("CIRCUIT_TIMEOUT", "60")) + temperature_env = os.getenv("CIRCUIT_TEMPERATURE") + temperature = float(temperature_env) if temperature_env else None + max_tokens_env = os.getenv("CIRCUIT_MAX_TOKENS") + max_tokens = int(max_tokens_env) if max_tokens_env else None + system_prompt = os.getenv("CIRCUIT_SYSTEM_PROMPT") + + return _CircuitConfig( + base_url=base_url, + deployment=deployment, + app_key=app_key, + client_id=client_id, + client_secret=client_secret, + token_url=token_url, + token_cache=token_cache, + static_token=static_token, + session_id=session_id, + user_id=user_id, + timeout=timeout, + temperature=temperature, + max_tokens=max_tokens, + system_prompt=system_prompt, + ) + + +class CiscoCircuitEvaluationLLM(DeepEvalBaseLLM): + """Deepeval model that forwards prompts to the Cisco CircuIT API.""" + + def __init__( + self, config: _CircuitConfig, session: requests.Session | None = None + ) -> None: + self._config = config + self._session = session or requests.Session() + self._endpoint = f"{config.base_url}/openai/deployments/{config.deployment}/chat/completions" + self._user_payload = { + "appkey": config.app_key, + "session_id": config.session_id or "", + "user": config.user_id or "", + } + self._token_manager: _CiscoCircuitTokenManager | None = None + self._static_token: str | None + if config.static_token: + self._static_token = config.static_token + elif config.client_id and config.client_secret: + self._token_manager = _CiscoCircuitTokenManager( + config.client_id, + config.client_secret, + config.token_url, + config.token_cache, + ) + self._static_token = None + else: + self._static_token = None + self._model_name = f"circuit://{config.deployment}" + + def load_model(self) -> "CiscoCircuitEvaluationLLM": + return self + + def get_model_name(self) -> str: + return self._model_name + + def generate(self, prompt: str, schema: Any | None = None) -> Any: + text = self._invoke(prompt) + if schema is None: + return text + try: + return self._apply_schema(schema, text) + except Exception as exc: + LOGGER.debug( + "CircuIT schema parsing failed; falling back to raw text", + exc_info=True, + ) + raise TypeError("schema parsing failed") from exc + + async def a_generate(self, prompt: str, schema: Any | None = None) -> Any: + return await asyncio.to_thread(self.generate, prompt, schema) + + def _invoke(self, prompt: str) -> str: + if not prompt or not prompt.strip(): + raise ValueError("Prompt must be a non-empty string") + payload = self._build_payload(prompt) + response = self._post(payload) + raw_body = "" + try: + raw_body = response.text # type: ignore[attr-defined] + except Exception: + raw_body = "" + if not raw_body: + try: + raw_body = json.dumps(response.json()) + except Exception: + raw_body = "" + if os.getenv("CIRCUIT_DEBUG_RAW") == "1": + print( + f"[circuit] status={getattr(response, 'status_code', '?')} body={raw_body}" + ) + if raw_body: + LOGGER.debug("CircuIT raw response: %s", raw_body[:2000]) + try: + data = json.loads(raw_body) if raw_body else response.json() + except ( + json.JSONDecodeError, + TypeError, + AttributeError, + ) as exc: # pragma: no cover - defensive + raise RuntimeError("CircuIT response was not valid JSON") from exc + content = self._extract_content(data) + if content is None: + raise RuntimeError( + "CircuIT response did not include message content" + ) + return content + + def _build_payload(self, prompt: str) -> dict[str, Any]: + messages: list[dict[str, str]] = [] + if self._config.system_prompt: + messages.append( + {"role": "system", "content": self._config.system_prompt} + ) + messages.append({"role": "user", "content": prompt}) + payload: dict[str, Any] = { + "messages": messages, + "stream": False, + } + if self._config.temperature is not None: + payload["temperature"] = self._config.temperature + if self._config.max_tokens is not None: + payload["max_tokens"] = self._config.max_tokens + payload["user"] = json.dumps(self._user_payload) + return payload + + def _headers(self, token: str) -> dict[str, str]: + return { + "Content-Type": "application/json", + "Accept": "application/json", + "api-key": token, + } + + def _resolve_token(self, force_refresh: bool = False) -> str: + if self._static_token: + return self._static_token + if not self._token_manager: + raise RuntimeError("CircuIT OAuth credentials are not configured") + return self._token_manager.get_token(force_refresh=force_refresh) + + def _post(self, payload: dict[str, Any]) -> requests.Response: + token = self._resolve_token() + response = self._session.post( + self._endpoint, + headers=self._headers(token), + json=payload, + timeout=self._config.timeout, + ) + if ( + response.status_code == 401 + and not self._static_token + and self._token_manager + ): + LOGGER.info("CircuIT returned 401; refreshing token and retrying") + fresh_token = self._resolve_token(force_refresh=True) + response = self._session.post( + self._endpoint, + headers=self._headers(fresh_token), + json=payload, + timeout=self._config.timeout, + ) + response.raise_for_status() + return response + + @staticmethod + def _extract_content(data: dict[str, Any]) -> str | None: + choices = data.get("choices") + if not isinstance(choices, list) or not choices: + return None + first = choices[0] + if not isinstance(first, dict): + return None + message = first.get("message") + if not isinstance(message, dict): + return None + content = message.get("content") + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + for entry in content: + if isinstance(entry, dict): + if entry.get("type") == "text" and isinstance( + entry.get("text"), str + ): + return entry["text"].strip() + if entry.get("type") == "output_text" and isinstance( + entry.get("text"), str + ): + return entry["text"].strip() + elif isinstance(entry, str) and entry.strip(): + return entry.strip() + return None + + @staticmethod + def _apply_schema(schema: Any, payload: str) -> Any: + if hasattr(schema, "model_validate_json"): + return schema.model_validate_json(payload) + if hasattr(schema, "model_validate"): + import json as _json + + data = _json.loads(payload) + return schema.model_validate(data) + if hasattr(schema, "parse_raw"): + return schema.parse_raw(payload) + if callable(schema): + import json as _json + + data = _json.loads(payload) + return schema(**data) + raise TypeError("unsupported schema type") + + +def create_circuit_llm() -> CiscoCircuitEvaluationLLM: + """Factory used by the entry point registry.""" + + config = _load_config() + return CiscoCircuitEvaluationLLM(config) + + +__all__ = [ + "CiscoCircuitEvaluationLLM", + "create_circuit_llm", +] diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_version.py b/util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_version.py new file mode 100644 index 0000000..b2cd131 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/src/opentelemetry/util/evaluator/circuit_version.py @@ -0,0 +1,3 @@ +# Copyright The OpenTelemetry Authors + +VERSION = "0.1.0" diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/tests/conftest.py b/util/opentelemetry-util-genai-evals-splunk-circuit/tests/conftest.py new file mode 100644 index 0000000..30637f9 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/tests/conftest.py @@ -0,0 +1,16 @@ +import sys +from pathlib import Path + +_pkg_root = Path(__file__).resolve().parents[1] +_src = _pkg_root / "src" +if str(_src) not in sys.path: + sys.path.insert(0, str(_src)) + +# Ensure the base deepeval evaluator package is importable for shared helpers. +_base = ( + Path(__file__).resolve().parents[2] + / "opentelemetry-util-genai-evals-deepeval" + / "src" +) +if _base.exists() and str(_base) not in sys.path: + sys.path.insert(0, str(_base)) diff --git a/util/opentelemetry-util-genai-evals-splunk-circuit/tests/test_circuit_deepeval.py b/util/opentelemetry-util-genai-evals-splunk-circuit/tests/test_circuit_deepeval.py new file mode 100644 index 0000000..3e4da47 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-splunk-circuit/tests/test_circuit_deepeval.py @@ -0,0 +1,130 @@ +import json +from pathlib import Path +from typing import Any + +import pytest + +from opentelemetry.util.evaluator.circuit_deepeval import ( + CiscoCircuitEvaluationLLM, + create_circuit_llm, +) + + +class _DummyResponse: + def __init__( + self, *, status_code: int = 200, payload: dict[str, Any] | None = None + ): + self.status_code = status_code + self._payload = payload or {} + self._text = json.dumps(self._payload) + + def json(self) -> dict[str, Any]: + return self._payload + + @property + def text(self) -> str: + return self._text + + def raise_for_status(self) -> None: + if self.status_code >= 400: + raise RuntimeError(f"status={self.status_code}") + + +def test_create_circuit_llm_requires_app_key(monkeypatch): + monkeypatch.delenv("CISCO_APP_KEY", raising=False) + monkeypatch.delenv("CIRCUIT_APP_KEY", raising=False) + monkeypatch.setenv("DEEPEVAL_CIRCUIT_DEPLOYMENT", "demo") + with pytest.raises(RuntimeError, match="CISCO_APP_KEY"): + create_circuit_llm() + + +def test_static_token_invocation(monkeypatch): + captured = {} + + def fake_post(self, url, headers=None, json=None, timeout=None): # pylint: disable=unused-argument + captured["url"] = url + captured["headers"] = headers + captured["payload"] = json + return _DummyResponse( + payload={ + "choices": [ + { + "message": {"content": "hello"}, + } + ] + } + ) + + monkeypatch.setenv("CISCO_APP_KEY", "appkey") + monkeypatch.setenv("CISCO_CIRCUIT_TOKEN", "token-123") + monkeypatch.setenv("DEEPEVAL_CIRCUIT_DEPLOYMENT", "deployment") + + monkeypatch.setattr("requests.Session.post", fake_post, raising=False) + + model = create_circuit_llm() + assert isinstance(model, CiscoCircuitEvaluationLLM) + result = model.generate("Hi") + assert result == "hello" + assert "/openai/deployments/deployment/chat/completions" in captured["url"] + user_payload = json.loads(captured["payload"]["user"]) + assert user_payload["appkey"] == "appkey" + assert captured["headers"]["api-key"] == "token-123" + + +def test_token_refresh_flow(monkeypatch, tmp_path): + token_cache = tmp_path / "token.json" + tokens = ["initial-token", "refreshed-token"] + + def fake_token_post(url, headers=None, data=None, auth=None, timeout=None): # pylint: disable=unused-argument + token = tokens.pop(0) + return _DummyResponse( + payload={"access_token": token, "expires_in": 3600} + ) + + responses = [ + _DummyResponse(status_code=401), + _DummyResponse( + payload={ + "choices": [ + { + "message": {"content": "refreshed"}, + } + ] + } + ), + ] + + def fake_session_post(self, url, headers=None, json=None, timeout=None): # pylint: disable=unused-argument + response = responses.pop(0) + if response.status_code >= 400: + return response + return response + + monkeypatch.setenv("CISCO_APP_KEY", "appkey") + monkeypatch.setenv("CISCO_CLIENT_ID", "client") + monkeypatch.setenv("CISCO_CLIENT_SECRET", "secret") + monkeypatch.setenv("CIRCUIT_TOKEN_CACHE", str(token_cache)) + monkeypatch.setenv("DEEPEVAL_CIRCUIT_DEPLOYMENT", "deployment") + + monkeypatch.setattr("requests.post", fake_token_post) + monkeypatch.setattr( + "requests.Session.post", fake_session_post, raising=False + ) + + model = create_circuit_llm() + assert model.generate("refresh please") == "refreshed" + cache_content = json.loads(Path(token_cache).read_text("utf-8")) + assert cache_content["access_token"] in { + "initial-token", + "refreshed-token", + } + + +def test_missing_credentials(monkeypatch): + monkeypatch.delenv("CISCO_CIRCUIT_TOKEN", raising=False) + monkeypatch.delenv("CISCO_CLIENT_ID", raising=False) + monkeypatch.delenv("CISCO_CLIENT_SECRET", raising=False) + monkeypatch.setenv("CISCO_APP_KEY", "appkey") + monkeypatch.setenv("DEEPEVAL_CIRCUIT_DEPLOYMENT", "deployment") + with pytest.raises(RuntimeError, match="CISCO_CIRCUIT_TOKEN"): + create_circuit_llm() From be24691ed7dba2fad10d36b3591104d19bf93e08 Mon Sep 17 00:00:00 2001 From: Keith Decker Date: Tue, 25 Nov 2025 12:17:06 -0700 Subject: [PATCH 2/3] Lint Fixes --- .../examples/multi_agent_travel_planner/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py index 358080d..ab475c4 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py @@ -29,7 +29,7 @@ import random import time from datetime import datetime, timedelta -from typing import Annotated, Dict, List, Optional, TypedDict +from typing import Annotated, List, Optional, TypedDict from uuid import uuid4 from langchain_core.messages import ( From 828729574a1efb93af78f4fcf27780c2d62fda2d Mon Sep 17 00:00:00 2001 From: Keith Decker Date: Tue, 25 Nov 2025 12:20:36 -0700 Subject: [PATCH 3/3] Lint fixes --- .../circuit_support.py | 36 ++++++++++++++----- .../multi_agent_travel_planner/main.py | 1 + 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py index 7e44264..4f72983 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/circuit_support.py @@ -145,11 +145,15 @@ def _write_cached_token(cache_path: Path | None, token: str, expires_in: int) -> _debug_token_message("Unable to persist CircuIT token cache") -def _fetch_circuit_token(client_id: str, client_secret: str, token_url: str) -> tuple[str, int]: +def _fetch_circuit_token( + client_id: str, client_secret: str, token_url: str +) -> tuple[str, int]: try: requests = importlib.import_module("requests") except ModuleNotFoundError as exc: # pragma: no cover - dependency guard - raise RuntimeError("requests is required to mint Cisco CircuIT access tokens") from exc + raise RuntimeError( + "requests is required to mint Cisco CircuIT access tokens" + ) from exc response = requests.post( token_url, @@ -181,10 +185,16 @@ def _mint_circuit_token(default_cache_filename: str) -> tuple[str, str, Optional return cached, "oauth-cache", cache_path client_id = os.getenv("CISCO_CLIENT_ID") or os.getenv("CIRCUIT_CLIENT_ID") - client_secret = os.getenv("CISCO_CLIENT_SECRET") or os.getenv("CIRCUIT_CLIENT_SECRET") + client_secret = os.getenv("CISCO_CLIENT_SECRET") or os.getenv( + "CIRCUIT_CLIENT_SECRET" + ) if not client_id or not client_secret: - raise RuntimeError("Set CISCO_CIRCUIT_TOKEN or provide Cisco OAuth client credentials") - token_url = os.getenv("CISCO_TOKEN_URL", "https://id.cisco.com/oauth2/default/v1/token") + raise RuntimeError( + "Set CISCO_CIRCUIT_TOKEN or provide Cisco OAuth client credentials" + ) + token_url = os.getenv( + "CISCO_TOKEN_URL", "https://id.cisco.com/oauth2/default/v1/token" + ) token, expires_in = _fetch_circuit_token(client_id, client_secret, token_url) _write_cached_token(cache_path, token, expires_in) return token, "oauth-fetch", cache_path @@ -201,7 +211,9 @@ def _augment_circuit_kwargs( app_key = os.getenv("CISCO_APP_KEY") or os.getenv("CIRCUIT_APP_KEY") if not app_key: - raise RuntimeError("CISCO_APP_KEY (or CIRCUIT_APP_KEY) must be set when using CircuIT") + raise RuntimeError( + "CISCO_APP_KEY (or CIRCUIT_APP_KEY) must be set when using CircuIT" + ) raw_base = kwargs.pop("base_url", None) or os.getenv("CIRCUIT_API_BASE") if raw_base: @@ -211,7 +223,9 @@ def _augment_circuit_kwargs( if "/openai/deployments/" not in sanitized: sanitized = f"{sanitized}/openai/deployments/{model}" else: - upstream = os.getenv("CIRCUIT_UPSTREAM_BASE", "https://chat-ai.cisco.com").rstrip("/") + upstream = os.getenv( + "CIRCUIT_UPSTREAM_BASE", "https://chat-ai.cisco.com" + ).rstrip("/") sanitized = f"{upstream}/openai/deployments/{model}" if sanitized.endswith("/chat/completions"): sanitized = sanitized[: -len("/chat/completions")] @@ -262,13 +276,17 @@ def _augment_circuit_kwargs( if token_source_label == "kwargs": _debug_token_message("using CircuIT token supplied via kwargs", preview) else: - env_name = token_source_label.split(":", 1)[-1] if token_source_label else "env" + env_name = ( + token_source_label.split(":", 1)[-1] if token_source_label else "env" + ) _debug_token_message(f"using CircuIT token from {env_name}", preview) if ignored_sources: joined = ",".join(sorted(set(ignored_sources))) debug["ignored_token_sources"] = joined - _debug_token_message(f"force OAuth enabled so ignoring static token from {joined}") + _debug_token_message( + f"force OAuth enabled so ignoring static token from {joined}" + ) debug["token_source"] = token_source_label or "unknown" debug["token_hint"] = preview diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py index ab475c4..10f5336 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py @@ -183,6 +183,7 @@ class PoisonConfig(TypedDict): def _provider_setting() -> str: return os.getenv("TRAVEL_LLM_PROVIDER", "").strip().lower() + def _create_llm(agent_name: str, *, temperature: float, session_id: str) -> ChatOpenAI: """Create an LLM instance decorated with tags/metadata for tracing.""" provider_setting = _provider_setting()