diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst new file mode 100644 index 0000000..bf9ea59 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/README.rst @@ -0,0 +1,217 @@ +OpenTelemetry LlamaIndex Instrumentation +========================================= + +This library provides automatic instrumentation for LlamaIndex applications using OpenTelemetry. + +Installation +------------ + +Development installation:: + + # Install the package in editable mode + cd instrumentation-genai/opentelemetry-instrumentation-llamaindex + pip install -e . + + # Install test dependencies + pip install -e ".[test]" + + # Install util-genai (required for telemetry) + cd ../../util/opentelemetry-util-genai + pip install -e . + + +Quick Start +----------- + +.. code-block:: python + + import os + from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + from opentelemetry import trace, metrics + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import InMemoryMetricReader + + # Enable metrics (default is spans only) + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Enable instrumentation with providers + LlamaindexInstrumentor().instrument( + tracer_provider=trace.get_tracer_provider(), + meter_provider=meter_provider + ) + + # Use LlamaIndex as normal + from llama_index.llms.openai import OpenAI + from llama_index.core.llms import ChatMessage, MessageRole + + llm = OpenAI(model="gpt-3.5-turbo") + messages = [ChatMessage(role=MessageRole.USER, content="Hello")] + response = llm.chat(messages) + + +Running Tests +------------- + +**LLM Tests**: + +.. code-block:: bash + + # Set environment variables + export OPENAI_API_KEY=your-api-key + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + + # Run the test + cd tests + python test_llm_instrumentation.py + +**Embedding Tests**: + +.. code-block:: bash + + # Set environment variables + export OPENAI_API_KEY=your-api-key + export OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + + # Run the test + cd tests + python test_embedding_instrumentation.py + + +Expected Output +--------------- + +**LLM Span Attributes**:: + + { + "gen_ai.framework": "llamaindex", + "gen_ai.request.model": "gpt-3.5-turbo", + "gen_ai.operation.name": "chat", + "gen_ai.usage.input_tokens": 24, + "gen_ai.usage.output_tokens": 7 + } + +**Embedding Span Attributes**:: + + { + "gen_ai.operation.name": "embeddings", + "gen_ai.request.model": "text-embedding-3-small", + "gen_ai.provider.name": "openai", + "gen_ai.embeddings.dimension.count": 1536 + } + +**Metrics**:: + + Metric: gen_ai.client.operation.duration + Duration: 0.6900 seconds + Count: 1 + + Metric: gen_ai.client.token.usage + Token type: input, Sum: 24, Count: 1 + Token type: output, Sum: 7, Count: 1 + + +Key Implementation Differences from LangChain +---------------------------------------------- + +**1. Event-Based Callbacks** + +LlamaIndex uses ``on_event_start(event_type, ...)`` and ``on_event_end(event_type, ...)`` +instead of LangChain's method-based callbacks (``on_llm_start``, ``on_llm_end``). + +Event types are dispatched via ``CBEventType`` enum:: + + CBEventType.LLM # LLM invocations (chat, complete) + CBEventType.AGENT # Agent steps (not yet instrumented) + CBEventType.EMBEDDING # Embedding operations (get_text_embedding, get_text_embedding_batch) + +**2. Handler Registration** + +LlamaIndex uses ``handlers`` list:: + + callback_manager.handlers.append(handler) + +LangChain uses ``inheritable_handlers``:: + + callback_manager.inheritable_handlers.append(handler) + +**3. Response Structure** + +LlamaIndex ``ChatMessage`` uses ``blocks`` (list of TextBlock objects):: + + message.content # Computed property from blocks[0].text + +LangChain uses simple strings:: + + message.content # Direct string property + +**4. Token Usage** + +LlamaIndex returns objects (not dicts):: + + response.raw.usage.prompt_tokens # Object attribute + response.raw.usage.completion_tokens # Object attribute + +LangChain returns dicts:: + + response["usage"]["prompt_tokens"] # Dict key + response["usage"]["completion_tokens"] # Dict key + + +Supported Features +------------------ + +**LLM Operations** + +* ✅ Chat completion (``llm.chat()``, ``llm.stream_chat()``) +* ✅ Text completion (``llm.complete()``, ``llm.stream_complete()``) +* ✅ Token usage tracking +* ✅ Model name detection +* ✅ Framework attribution + +**Embedding Operations** + +* ✅ Single text embedding (``embed_model.get_text_embedding()``) +* ✅ Batch embedding (``embed_model.get_text_embedding_batch()``) +* ✅ Query embedding (``embed_model.get_query_embedding()``) +* ✅ Provider detection (OpenAI, Azure, AWS Bedrock, Google, Cohere, HuggingFace, Ollama, and more) +* ✅ Dimension count tracking +* ✅ Input text capture + +**Provider Detection** + +Embedding instrumentation automatically detects the provider from class names: + +* **OpenAI**: ``OpenAIEmbedding`` +* **Azure**: ``AzureOpenAIEmbedding`` +* **AWS**: ``BedrockEmbedding`` +* **Google**: ``GeminiEmbedding``, ``VertexTextEmbedding``, ``GooglePaLMEmbedding`` +* **Cohere**: ``CohereEmbedding`` +* **HuggingFace**: ``HuggingFaceEmbedding``, ``HuggingFaceInferenceAPIEmbedding`` +* **Ollama**: ``OllamaEmbedding`` +* **Anthropic**: ``AnthropicEmbedding`` +* **MistralAI**: ``MistralAIEmbedding`` +* **Together**: ``TogetherEmbedding`` +* **Fireworks**: ``FireworksEmbedding`` +* **Voyage**: ``VoyageEmbedding`` +* **Jina**: ``JinaEmbedding`` + + +References +---------- + +* `OpenTelemetry Project `_ +* `LlamaIndex `_ +* `LlamaIndex Callbacks `_ diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml new file mode 100644 index 0000000..55a6708 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/pyproject.toml @@ -0,0 +1,58 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "splunk-otel-instrumentation-llamaindex" +dynamic = ["version"] +description = "OpenTelemetry LlamaIndex instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api ~= 1.38.0.dev0", + "opentelemetry-instrumentation ~= 0.59b0.dev0", + "opentelemetry-semantic-conventions ~= 0.59b0.dev0", + "splunk-otel-util-genai>=0.1.4", +] + +[project.optional-dependencies] +instruments = ["llama-index-core >= 0.14.0"] +test = [ + "llama-index-core >= 0.14.0", + "llama-index-llms-openai >= 0.6.0", + "pytest >= 7.0.0", +] + +[project.entry-points.opentelemetry_instrumentor] +llamaindex = "opentelemetry.instrumentation.llamaindex:LlamaindexInstrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-llamaindex" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/llamaindex/version.py" + +[tool.hatch.build.targets.sdist] +include = ["/src", "/tests"] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + +[tool.ruff] +exclude = ["./"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py new file mode 100644 index 0000000..7aeb05c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/__init__.py @@ -0,0 +1,81 @@ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.util.genai.handler import get_telemetry_handler +from opentelemetry.instrumentation.llamaindex.config import Config +from opentelemetry.instrumentation.llamaindex.callback_handler import ( + LlamaindexCallbackHandler, +) +from opentelemetry.instrumentation.llamaindex.workflow_instrumentation import ( + wrap_agent_run, +) +from wrapt import wrap_function_wrapper + +_instruments = ("llama-index-core >= 0.14.0",) + + +class LlamaindexInstrumentor(BaseInstrumentor): + def __init__( + self, + exception_logger=None, + disable_trace_context_propagation=False, + use_legacy_attributes: bool = True, + ): + super().__init__() + Config._exception_logger = exception_logger + Config.use_legacy_attributes = use_legacy_attributes + self._disable_trace_context_propagation = ( + disable_trace_context_propagation + ) + self._telemetry_handler = None + + def instrumentation_dependencies(self): + return _instruments + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + self._telemetry_handler = get_telemetry_handler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) + + llamaindexCallBackHandler = LlamaindexCallbackHandler( + telemetry_handler=self._telemetry_handler + ) + + wrap_function_wrapper( + module="llama_index.core.callbacks.base", + name="CallbackManager.__init__", + wrapper=_BaseCallbackManagerInitWrapper(llamaindexCallBackHandler), + ) + + # Instrument Workflow-based agents + try: + wrap_function_wrapper( + module="llama_index.core.agent", + name="ReActAgent.run", + wrapper=wrap_agent_run, + ) + except Exception: + # ReActAgent might not be available or importable + pass + + def _uninstrument(self, **kwargs): + pass + + +class _BaseCallbackManagerInitWrapper: + def __init__(self, callback_handler: "LlamaindexCallbackHandler"): + self._callback_handler = callback_handler + + def __call__(self, wrapped, instance, args, kwargs) -> None: + wrapped(*args, **kwargs) + # LlamaIndex uses 'handlers' instead of 'inheritable_handlers' + for handler in instance.handlers: + if isinstance(handler, type(self._callback_handler)): + break + else: + self._callback_handler._callback_manager = instance + instance.add_handler(self._callback_handler) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py new file mode 100644 index 0000000..c7f00ad --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/callback_handler.py @@ -0,0 +1,487 @@ +from typing import Any, Dict, Optional + +from llama_index.core.callbacks.base_handler import BaseCallbackHandler +from llama_index.core.callbacks.schema import CBEventType + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + AgentInvocation, + EmbeddingInvocation, + InputMessage, + LLMInvocation, + OutputMessage, + Step, + Text, + ToolCall, + Workflow, +) + +from .vendor_detection import detect_vendor_from_class + + +def _safe_str(value: Any) -> str: + """Safely convert value to string.""" + try: + return str(value) + except (TypeError, ValueError): + return "" + + +class LlamaindexCallbackHandler(BaseCallbackHandler): + """LlamaIndex callback handler supporting LLM and Embedding instrumentation.""" + + def __init__( + self, + telemetry_handler: Optional[TelemetryHandler] = None, + ) -> None: + super().__init__( + event_starts_to_ignore=[], + event_ends_to_ignore=[], + ) + self._handler = telemetry_handler + + def start_trace(self, trace_id: Optional[str] = None) -> None: + """Start a trace - required by BaseCallbackHandler.""" + pass + + def end_trace( + self, + trace_id: Optional[str] = None, + trace_map: Optional[Dict[str, Any]] = None, + ) -> None: + """End a trace - required by BaseCallbackHandler.""" + pass + + def on_event_start( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + parent_id: str = "", + **kwargs: Any, + ) -> str: + """Handle event start - processing LLM, EMBEDDING, AGENT_STEP, and FUNCTION_CALL events.""" + if event_type == CBEventType.LLM: + self._handle_llm_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.EMBEDDING: + self._handle_embedding_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.AGENT_STEP: + self._handle_agent_step_start(event_id, parent_id, payload, **kwargs) + elif event_type == CBEventType.FUNCTION_CALL: + self._handle_function_call_start(event_id, parent_id, payload, **kwargs) + return event_id + + def on_event_end( + self, + event_type: CBEventType, + payload: Optional[Dict[str, Any]] = None, + event_id: str = "", + **kwargs: Any, + ) -> None: + """Handle event end - processing LLM, EMBEDDING, AGENT_STEP, and FUNCTION_CALL events.""" + if event_type == CBEventType.LLM: + self._handle_llm_end(event_id, payload, **kwargs) + elif event_type == CBEventType.EMBEDDING: + self._handle_embedding_end(event_id, payload, **kwargs) + elif event_type == CBEventType.AGENT_STEP: + self._handle_agent_step_end(event_id, payload, **kwargs) + elif event_type == CBEventType.FUNCTION_CALL: + self._handle_function_call_end(event_id, payload, **kwargs) + + def _handle_llm_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle LLM invocation start.""" + if not self._handler or not payload: + return + + # Extract model information from payload + serialized = payload.get("serialized", {}) + model_name = ( + serialized.get("model") + or serialized.get("model_name") + or "unknown" + ) + + # Extract messages from payload + # LlamaIndex messages are ChatMessage objects with .content and .role properties + messages = payload.get("messages", []) + input_messages = [] + + for msg in messages: + # Handle ChatMessage objects (has .content property and .role attribute) + if hasattr(msg, "content") and hasattr(msg, "role"): + # Extract role - could be MessageRole enum + role_value = ( + str(msg.role.value) + if hasattr(msg.role, "value") + else str(msg.role) + ) + # Extract content - this is a property that pulls from blocks[0].text + content = _safe_str(msg.content) + input_messages.append( + InputMessage( + role=role_value, parts=[Text(content=content)] + ) + ) + elif isinstance(msg, dict): + # Handle serialized messages (dict format) + role = msg.get("role", "user") + # Try to extract from blocks first (LlamaIndex format) + blocks = msg.get("blocks", []) + if blocks and isinstance(blocks[0], dict): + content = blocks[0].get("text", "") + else: + # Fallback to direct content field + content = msg.get("content", "") + + role_value = ( + str(role.value) if hasattr(role, "value") else str(role) + ) + input_messages.append( + InputMessage( + role=role_value, + parts=[Text(content=_safe_str(content))], + ) + ) + + # Create LLM invocation with event_id as run_id + llm_inv = LLMInvocation( + request_model=_safe_str(model_name), + input_messages=input_messages, + attributes={}, + run_id=event_id, # Use event_id as run_id for registry lookup + ) + llm_inv.framework = "llamaindex" + + # Start the LLM invocation (handler stores it in _entity_registry) + self._handler.start_llm(llm_inv) + + def _handle_llm_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle LLM invocation end.""" + if not self._handler: + return + + # Get the LLM invocation from handler's registry using event_id + llm_inv = self._handler.get_entity(event_id) + if not llm_inv or not isinstance(llm_inv, LLMInvocation): + return + + if payload: + # Extract response from payload + response = payload.get("response") + + # Handle both dict and object types for response + if response: + # Get message - could be dict or object + if isinstance(response, dict): + message = response.get("message", {}) + raw_response = response.get("raw") + else: + # response is a ChatResponse object + message = getattr(response, "message", None) + raw_response = getattr(response, "raw", None) + + # Extract content from message + if message: + if isinstance(message, dict): + # Message is dict + blocks = message.get("blocks", []) + if blocks and isinstance(blocks[0], dict): + content = blocks[0].get("text", "") + else: + content = message.get("content", "") + else: + # Message is ChatMessage object + blocks = getattr(message, "blocks", []) + if blocks and len(blocks) > 0: + content = getattr(blocks[0], "text", "") + else: + content = getattr(message, "content", "") + + # Create output message + llm_inv.output_messages = [ + OutputMessage( + role="assistant", + parts=[Text(content=_safe_str(content))], + finish_reason="stop", + ) + ] + + # Extract token usage from response.raw (OpenAI format) + # LlamaIndex stores the raw API response (e.g., OpenAI response) in response.raw + # raw_response could be a dict or an object (e.g., ChatCompletion from OpenAI) + if raw_response: + # Try to get usage from dict or object + if isinstance(raw_response, dict): + usage = raw_response.get("usage", {}) + else: + # It's an object, try to get usage attribute + usage = getattr(raw_response, "usage", None) + + if usage: + # usage could also be dict or object + if isinstance(usage, dict): + llm_inv.input_tokens = usage.get("prompt_tokens") + llm_inv.output_tokens = usage.get("completion_tokens") + else: + llm_inv.input_tokens = getattr(usage, "prompt_tokens", None) + llm_inv.output_tokens = getattr(usage, "completion_tokens", None) + + # Stop the LLM invocation + self._handler.stop_llm(llm_inv) + + def _handle_embedding_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle embedding invocation start.""" + if not self._handler or not payload: + return + + # Extract model information from payload + serialized = payload.get("serialized", {}) + model_name = ( + serialized.get("model_name") + or serialized.get("model") + or "unknown" + ) + + # Detect provider from class name + class_name = serialized.get("class_name", "") + provider = detect_vendor_from_class(class_name) + + # Note: input texts are not available at start time in LlamaIndex + # They will be available in the end event payload + + # Create embedding invocation with event_id as run_id + emb_inv = EmbeddingInvocation( + request_model=_safe_str(model_name), + input_texts=[], # Will be populated on end event + provider=provider, + attributes={}, + run_id=event_id, + ) + emb_inv.framework = "llamaindex" + + # Start the embedding invocation + self._handler.start_embedding(emb_inv) + + def _handle_embedding_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle embedding invocation end.""" + if not self._handler: + return + + # Get the embedding invocation from handler's registry using event_id + emb_inv = self._handler.get_entity(event_id) + if not emb_inv or not isinstance(emb_inv, EmbeddingInvocation): + return + + if payload: + # Extract input chunks (texts) from response + # chunks is the list of input texts that were embedded + chunks = payload.get("chunks", []) + if chunks: + emb_inv.input_texts = [_safe_str(chunk) for chunk in chunks] + + # Extract embedding vectors from response + # embeddings is the list of output vectors + embeddings = payload.get("embeddings", []) + + # Determine dimension from first embedding vector + if embeddings and len(embeddings) > 0: + first_embedding = embeddings[0] + if isinstance(first_embedding, list): + emb_inv.dimension_count = len(first_embedding) + elif hasattr(first_embedding, "__len__"): + emb_inv.dimension_count = len(first_embedding) + + # Stop the embedding invocation + self._handler.stop_embedding(emb_inv) + + def _find_nearest_agent(self, parent_id: Optional[str]) -> Optional[AgentInvocation]: + """Walk up parent chain to find the nearest agent invocation.""" + if not self._handler: + return None + current_id = parent_id + while current_id: + entity = self._handler.get_entity(current_id) + if isinstance(entity, AgentInvocation): + return entity + # Move to parent + current_id = getattr(entity, "parent_run_id", None) + if current_id: + current_id = str(current_id) + return None + + def _handle_agent_step_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle agent step start - create AgentInvocation span.""" + if not self._handler or not payload: + return + + # Extract agent information from payload + task_id = payload.get("task_id", "") + input_text = payload.get("input") + step = payload.get("step") # TaskStep object with agent metadata + + # Extract agent metadata from step or payload + agent_name = None + agent_type = None + agent_description = None + model_name = None + + if step and hasattr(step, "step_state"): + # Try to get agent from step state + step_state = step.step_state + if hasattr(step_state, "agent"): + agent = step_state.agent + agent_name = getattr(agent, "name", None) + agent_type = getattr(agent, "agent_type", None) or type(agent).__name__ + agent_description = getattr(agent, "description", None) + # Try to get model from agent's LLM + if hasattr(agent, "llm"): + llm = agent.llm + model_name = getattr(llm, "model", None) or getattr(llm, "model_name", None) + + # Create AgentInvocation for the agent execution + agent_invocation = AgentInvocation( + name=f"agent.task.{task_id}" if task_id else "agent.invoke", + run_id=event_id, + parent_run_id=parent_id if parent_id else None, + input_context=input_text if input_text else "", + attributes={}, + ) + agent_invocation.framework = "llamaindex" + + # Set enhanced metadata + if agent_name: + agent_invocation.agent_name = _safe_str(agent_name) + if agent_type: + agent_invocation.agent_type = _safe_str(agent_type) + if agent_description: + agent_invocation.description = _safe_str(agent_description) + if model_name: + agent_invocation.model = _safe_str(model_name) + + self._handler.start_agent_invocation(agent_invocation) + + def _handle_agent_step_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle agent step end.""" + if not self._handler: + return + + agent_invocation = self._handler.get_entity(event_id) + if not agent_invocation or not isinstance(agent_invocation, AgentInvocation): + return + + if payload: + # Extract response/output if available + response = payload.get("response") + if response: + agent_invocation.output_result = _safe_str(response) + + # Stop the agent invocation + self._handler.stop_agent_invocation(agent_invocation) + + def _handle_function_call_start( + self, + event_id: str, + parent_id: str, + payload: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + """Handle function/tool call start.""" + if not self._handler or not payload: + return + + # Extract tool information + tool = payload.get("tool") + if not tool: + return + + tool_name = getattr(tool, "name", "unknown_tool") if hasattr(tool, "name") else "unknown_tool" + tool_description = getattr(tool, "description", "") if hasattr(tool, "description") else "" + + # Extract function arguments + function_call = payload.get("function_call", {}) + arguments = function_call if function_call else {} + + # Find nearest agent for context propagation + context_agent = self._find_nearest_agent(parent_id) if parent_id else None + + # Create ToolCall entity + tool_call = ToolCall( + name=tool_name, + arguments=arguments, + id=event_id, + ) + + # Set attributes + tool_call.attributes = { + "tool.description": tool_description, + } + tool_call.run_id = event_id # type: ignore[attr-defined] + tool_call.parent_run_id = parent_id if parent_id else None # type: ignore[attr-defined] + tool_call.framework = "llamaindex" # type: ignore[attr-defined] + + # Propagate agent context to tool call + if context_agent: + agent_name = getattr(context_agent, "agent_name", None) or getattr(context_agent, "name", None) + if agent_name: + tool_call.agent_name = _safe_str(agent_name) # type: ignore[attr-defined] + tool_call.agent_id = str(context_agent.run_id) # type: ignore[attr-defined] + + # Start the tool call + self._handler.start_tool_call(tool_call) + + def _handle_function_call_end( + self, + event_id: str, + payload: Optional[Dict[str, Any]], + **kwargs: Any, + ) -> None: + """Handle function/tool call end.""" + if not self._handler: + return + + tool_call = self._handler.get_entity(event_id) + if not tool_call or not isinstance(tool_call, ToolCall): + return + + if payload: + # Extract tool output/result + tool_output = payload.get("tool_output") + if tool_output: + # Store the result as response + tool_call.response = _safe_str(tool_output) # type: ignore[attr-defined] + + # Stop the tool call + self._handler.stop_tool_call(tool_call) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py new file mode 100644 index 0000000..44199c0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/config.py @@ -0,0 +1,3 @@ +class Config: + exception_logger = None + use_legacy_attributes = True diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py new file mode 100644 index 0000000..6f9c9f0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/vendor_detection.py @@ -0,0 +1,119 @@ +"""Vendor detection for LlamaIndex embedding providers.""" + +from dataclasses import dataclass +from typing import List, Set + + +@dataclass(frozen=True) +class VendorRule: + """Rule for detecting vendor from LlamaIndex class names.""" + + exact_matches: Set[str] + patterns: List[str] + vendor_name: str + + def matches(self, class_name: str) -> bool: + """Check if class name matches this vendor rule.""" + if class_name in self.exact_matches: + return True + class_lower = class_name.lower() + return any(pattern in class_lower for pattern in self.patterns) + + +def _get_vendor_rules() -> List[VendorRule]: + """ + Get vendor detection rules ordered by specificity (most specific first). + + Returns: + List of VendorRule objects for detecting embedding vendors from class names + """ + return [ + VendorRule( + exact_matches={"AzureOpenAIEmbedding"}, + patterns=["azure"], + vendor_name="azure", + ), + VendorRule( + exact_matches={"OpenAIEmbedding"}, + patterns=["openai"], + vendor_name="openai", + ), + VendorRule( + exact_matches={"BedrockEmbedding"}, + patterns=["bedrock", "aws"], + vendor_name="aws", + ), + VendorRule( + exact_matches={"VertexTextEmbedding", "GeminiEmbedding", "GooglePaLMEmbedding"}, + patterns=["vertex", "google", "palm", "gemini"], + vendor_name="google", + ), + VendorRule( + exact_matches={"CohereEmbedding"}, + patterns=["cohere"], + vendor_name="cohere", + ), + VendorRule( + exact_matches={"HuggingFaceEmbedding", "HuggingFaceInferenceAPIEmbedding"}, + patterns=["huggingface"], + vendor_name="huggingface", + ), + VendorRule( + exact_matches={"OllamaEmbedding"}, + patterns=["ollama"], + vendor_name="ollama", + ), + VendorRule( + exact_matches={"AnthropicEmbedding"}, + patterns=["anthropic"], + vendor_name="anthropic", + ), + VendorRule( + exact_matches={"MistralAIEmbedding"}, + patterns=["mistral"], + vendor_name="mistralai", + ), + VendorRule( + exact_matches={"TogetherEmbedding"}, + patterns=["together"], + vendor_name="together", + ), + VendorRule( + exact_matches={"FireworksEmbedding"}, + patterns=["fireworks"], + vendor_name="fireworks", + ), + VendorRule( + exact_matches={"VoyageEmbedding"}, + patterns=["voyage"], + vendor_name="voyage", + ), + VendorRule( + exact_matches={"JinaEmbedding"}, + patterns=["jina"], + vendor_name="jina", + ), + ] + + +def detect_vendor_from_class(class_name: str) -> str: + """ + Detect vendor from LlamaIndex embedding class name. + Uses unified detection rules combining exact matches and patterns. + + Args: + class_name: The class name from serialized embedding information + + Returns: + Vendor string (lowercase), defaults to None if no match found + """ + if not class_name: + return None + + vendor_rules = _get_vendor_rules() + + for rule in vendor_rules: + if rule.matches(class_name): + return rule.vendor_name + + return None diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py new file mode 100644 index 0000000..8449c3e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/src/opentelemetry/instrumentation/llamaindex/workflow_instrumentation.py @@ -0,0 +1,166 @@ +""" +Workflow-based agent instrumentation for LlamaIndex. + +This module provides instrumentation for Workflow-based agents (ReActAgent, etc.) +by intercepting workflow event streams to capture agent steps and tool calls. +""" + +import asyncio +from typing import Any, Optional +from uuid import uuid4 + +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import AgentInvocation, ToolCall + + +class WorkflowEventInstrumentor: + """Instrumentor that wraps WorkflowHandler to capture agent and tool events.""" + + def __init__(self, handler: TelemetryHandler): + self._handler = handler + self._active_agents = {} # event_id -> AgentInvocation + self._active_tools = {} # tool_id -> ToolCall + + async def instrument_workflow_handler(self, workflow_handler, initial_message: str): + """ + Instrument a WorkflowHandler by streaming its events and creating telemetry spans. + + Args: + workflow_handler: The WorkflowHandler returned by agent.run() + initial_message: The user's initial message to the agent + """ + from llama_index.core.agent.workflow.workflow_events import ( + AgentInput, + AgentOutput, + ToolCall as WorkflowToolCall, + ToolCallResult, + ) + + agent_invocation = None + agent_run_id = None + + try: + async for event in workflow_handler.stream_events(): + # Agent step start + if isinstance(event, AgentInput): + # Start a new agent invocation + agent_run_id = str(uuid4()) + agent_invocation = AgentInvocation( + name=f"agent.{event.current_agent_name}", + run_id=agent_run_id, + input_context=str(event.input) if hasattr(event, 'input') and event.input else initial_message, + attributes={}, + ) + agent_invocation.framework = "llamaindex" + agent_invocation.agent_name = event.current_agent_name + + self._handler.start_agent(agent_invocation) + self._active_agents[agent_run_id] = agent_invocation + + # Tool call start + elif isinstance(event, WorkflowToolCall): + tool_call = ToolCall( + arguments=event.tool_kwargs, + name=event.tool_name, + id=event.tool_id, + attributes={}, + ) + tool_call.framework = "llamaindex" + + # Associate with current agent if available + if agent_invocation: + tool_call.agent_name = agent_invocation.agent_name + tool_call.agent_id = str(agent_invocation.run_id) + # Set parent_span explicitly - the agent span is the parent of this tool + if hasattr(agent_invocation, 'span') and agent_invocation.span: + tool_call.parent_span = agent_invocation.span + + self._handler.start_tool_call(tool_call) + self._active_tools[event.tool_id] = tool_call + + # Tool call end + elif isinstance(event, ToolCallResult): + tool_call = self._active_tools.get(event.tool_id) + if tool_call: + # Extract result + result = event.tool_output + if hasattr(result, 'content'): + tool_call.result = str(result.content) + else: + tool_call.result = str(result) + + self._handler.stop_tool_call(tool_call) + del self._active_tools[event.tool_id] + + # Agent step end (when no more tools to call) + elif isinstance(event, AgentOutput): + # Check if this is the final output (no tool calls) + if not event.tool_calls and agent_invocation: + # Extract response + if hasattr(event.response, 'content'): + agent_invocation.output_result = str(event.response.content) + else: + agent_invocation.output_result = str(event.response) + + self._handler.stop_agent(agent_invocation) + if agent_run_id: + del self._active_agents[agent_run_id] + agent_invocation = None + agent_run_id = None + + except Exception as e: + # Clean up any active spans on error + for tool_call in list(self._active_tools.values()): + from opentelemetry.util.genai.types import Error + error = Error(message=str(e), type=type(e)) + self._handler.fail_tool_call(tool_call, error) + self._active_tools.clear() + + if agent_invocation: + from opentelemetry.util.genai.types import Error + error = Error(message=str(e), type=type(e)) + self._handler.fail_agent(agent_invocation, error) + if agent_run_id: + del self._active_agents[agent_run_id] + + raise + + +def wrap_agent_run(wrapped, instance, args, kwargs): + """ + Wrap agent.run() to instrument workflow events. + + This function wraps the run() method of Workflow-based agents to capture + agent steps and tool calls via workflow event streaming. + """ + handler = wrapped(*args, **kwargs) + + # Get the initial user message + user_msg = kwargs.get('user_msg') or (args[0] if args else "") + + # Get TelemetryHandler from callback handler if available + from llama_index.core import Settings + telemetry_handler = None + for callback_handler in Settings.callback_manager.handlers: + if hasattr(callback_handler, '_handler'): + telemetry_handler = callback_handler._handler + break + + if telemetry_handler: + # Create workflow instrumentor + instrumentor = WorkflowEventInstrumentor(telemetry_handler) + + # Start background task to stream events + async def stream_events_background(): + try: + await instrumentor.instrument_workflow_handler(handler, str(user_msg)) + except Exception as e: + # Log error but don't crash the workflow + import logging + logger = logging.getLogger(__name__) + logger.warning(f"Error instrumenting workflow events: {e}") + + # Launch background task + asyncio.create_task(stream_events_background()) + + return handler diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py new file mode 100644 index 0000000..355a057 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_embedding_instrumentation.py @@ -0,0 +1,151 @@ +"""Test embedding instrumentation for LlamaIndex.""" + +import os + +from llama_index.core import Settings +from llama_index.core.callbacks import CallbackManager +from llama_index.embeddings.openai import OpenAIEmbedding +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor + + +# Global setup - shared across tests +metric_reader = None +instrumentor = None + + +def setup_telemetry(): + """Setup OpenTelemetry with span and metric exporters (once).""" + global metric_reader, instrumentor + + if metric_reader is not None: + return metric_reader + + # Enable metrics + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics with InMemoryMetricReader + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + # Enable instrumentation once + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument( + tracer_provider=trace.get_tracer_provider(), + meter_provider=metrics.get_meter_provider(), + ) + + return metric_reader + + +def test_embedding_single_text(): + """Test single text embedding instrumentation.""" + print("\nTest: Single Text Embedding") + print("=" * 60) + + metric_reader = setup_telemetry() + + # Configure embedding model + embed_model = OpenAIEmbedding( + model="text-embedding-3-small", + api_key=os.environ.get("OPENAI_API_KEY"), + ) + Settings.embed_model = embed_model + + # Make sure callback manager is initialized + if Settings.callback_manager is None: + Settings.callback_manager = CallbackManager() + + # Generate single embedding + text = "LlamaIndex is a data framework for LLM applications" + embedding = embed_model.get_text_embedding(text) + + print(f"\nText: {text}") + print(f"Embedding dimension: {len(embedding)}") + print(f"First 5 values: {embedding[:5]}") + + # Validate metrics + print("\nMetrics:") + metrics_data = metric_reader.get_metrics_data() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + print(f"\nMetric: {metric.name}") + for data_point in metric.data.data_points: + if hasattr(data_point, "bucket_counts"): + # Histogram + print(f" Count: {sum(data_point.bucket_counts)}") + else: + # Counter + print(f" Value: {data_point.value}") + + print("\nTest completed successfully") + + +def test_embedding_batch(): + """Test batch embedding instrumentation.""" + print("\nTest: Batch Embeddings") + print("=" * 60) + + metric_reader = setup_telemetry() + + # Configure embedding model + embed_model = OpenAIEmbedding( + model="text-embedding-3-small", + api_key=os.environ.get("OPENAI_API_KEY"), + ) + Settings.embed_model = embed_model + + # Make sure callback manager is initialized + if Settings.callback_manager is None: + Settings.callback_manager = CallbackManager() + + # Generate batch embeddings + texts = [ + "Paris is the capital of France", + "Berlin is the capital of Germany", + "Rome is the capital of Italy", + ] + embeddings = embed_model.get_text_embedding_batch(texts) + + print(f"\nEmbedded {len(embeddings)} texts") + print(f"Dimension: {len(embeddings[0])}") + + # Validate metrics + print("\nMetrics:") + metrics_data = metric_reader.get_metrics_data() + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + print(f"\nMetric: {metric.name}") + for data_point in metric.data.data_points: + if hasattr(data_point, "bucket_counts"): + # Histogram + print(f" Count: {sum(data_point.bucket_counts)}") + else: + # Counter + print(f" Value: {data_point.value}") + + print("\nTest completed successfully") + + +if __name__ == "__main__": + test_embedding_single_text() + print("\n" + "=" * 60 + "\n") + test_embedding_batch() + + # Cleanup + if instrumentor: + instrumentor.uninstrument() diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py new file mode 100644 index 0000000..50324c3 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_llm_instrumentation.py @@ -0,0 +1,190 @@ +"""Tests for LlamaIndex LLM instrumentation with OpenTelemetry.""" + +import os + +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.llms.mock import MockLLM +from opentelemetry import metrics, trace +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleSpanProcessor, +) +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + + +def setup_telemetry(): + """Setup OpenTelemetry with both trace and metrics exporters.""" + # Setup tracing + trace.set_tracer_provider(TracerProvider()) + tracer_provider = trace.get_tracer_provider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + # Setup metrics with InMemoryMetricReader + metric_reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + + return tracer_provider, meter_provider, metric_reader + + +def test_with_openai(): + """Test with real OpenAI API - requires OPENAI_API_KEY environment variable.""" + from llama_index.llms.openai import OpenAI + + print("=" * 80) + print("Testing with OpenAI API") + print("=" * 80) + + llm = OpenAI(model="gpt-3.5-turbo") + messages = [ + ChatMessage( + role=MessageRole.SYSTEM, content="You are a helpful assistant." + ), + ChatMessage( + role=MessageRole.USER, content="Say hello in exactly 5 words" + ), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content}") + + if hasattr(response, "raw") and response.raw: + if isinstance(response.raw, dict): + usage = response.raw.get("usage", {}) + else: + usage = getattr(response.raw, "usage", None) + + if usage: + if isinstance(usage, dict): + prompt_tokens = usage.get("prompt_tokens") + completion_tokens = usage.get("completion_tokens") + total_tokens = usage.get("total_tokens") + else: + prompt_tokens = getattr(usage, "prompt_tokens", None) + completion_tokens = getattr(usage, "completion_tokens", None) + total_tokens = getattr(usage, "total_tokens", None) + + print(f"\nToken Usage: input={prompt_tokens}, output={completion_tokens}, total={total_tokens}") + + print("=" * 80) + + +class MockLLMWithUsage(MockLLM): + """MockLLM that includes fake usage data for testing.""" + + def _complete(self, prompt, **kwargs): + """Override internal complete to inject usage data.""" + response = super()._complete(prompt, **kwargs) + # Note: MockLLM uses _complete internally, but we can't easily inject + # usage here because the ChatResponse is created later + return response + + +def test_with_mock(): + """Test with MockLLM - no API key needed.""" + print("=" * 80) + print("Testing with MockLLM") + print("=" * 80) + + llm = MockLLM(max_tokens=50) + messages = [ + ChatMessage( + role=MessageRole.SYSTEM, content="You are a helpful assistant." + ), + ChatMessage(role=MessageRole.USER, content="Say hello in 5 words"), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content[:100]}...") + print("=" * 80) + + +def test_message_extraction(): + """Test message extraction.""" + print("\n" + "=" * 80) + print("Testing message extraction") + print("=" * 80) + + llm = MockLLM(max_tokens=20) + messages = [ + ChatMessage(role=MessageRole.SYSTEM, content="You are helpful."), + ChatMessage(role=MessageRole.USER, content="Test message"), + ] + + response = llm.chat(messages) + print(f"\nResponse: {response.message.content[:50]}...") + print("=" * 80) + + +if __name__ == "__main__": + # Enable metrics emission + os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + + # Setup telemetry + tracer_provider, meter_provider, metric_reader = setup_telemetry() + + # Instrument LlamaIndex + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + meter_provider=meter_provider + ) + print("LlamaIndex instrumentation enabled\n") + + # Run tests + if os.environ.get("OPENAI_API_KEY"): + print("Testing with real OpenAI API\n") + test_with_openai() + else: + print("Testing with MockLLM (set OPENAI_API_KEY to test real API)\n") + test_with_mock() + + # Test message extraction + test_message_extraction() + + # Check metrics + print("\n" + "=" * 80) + print("Metrics Summary") + print("=" * 80) + + metrics_data = metric_reader.get_metrics_data() + found_duration = False + found_token_usage = False + + if metrics_data: + for rm in getattr(metrics_data, "resource_metrics", []) or []: + for scope in getattr(rm, "scope_metrics", []) or []: + for metric in getattr(scope, "metrics", []) or []: + print(f"\nMetric: {metric.name}") + + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION: + found_duration = True + dps = getattr(metric.data, "data_points", []) + if dps: + print(f" Duration: {dps[0].sum:.4f} seconds") + print(f" Count: {dps[0].count}") + + if metric.name == gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE: + found_token_usage = True + dps = getattr(metric.data, "data_points", []) + for dp in dps: + token_type = dp.attributes.get("gen_ai.token.type", "unknown") + print(f" Token type: {token_type}, Sum: {dp.sum}, Count: {dp.count}") + + print("\n" + "=" * 80) + status = [] + if found_duration: + status.append("Duration: OK") + if found_token_usage: + status.append("Token Usage: OK") + if not found_duration and not found_token_usage: + status.append("No metrics (use real API for metrics)") + + print("Status: " + " | ".join(status)) + print("=" * 80) diff --git a/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py new file mode 100644 index 0000000..ddfcfa5 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-llamaindex/tests/test_workflow_agent.py @@ -0,0 +1,90 @@ +""" +Test Workflow-based agent instrumentation. + +This test validates that workflow event streaming captures agent steps and tool calls. +""" +import asyncio +from llama_index.core.agent import ReActAgent +from llama_index.core import Settings +from llama_index.llms.openai import OpenAI +from llama_index.core.tools import FunctionTool +from opentelemetry import trace +from opentelemetry.instrumentation.llamaindex import LlamaindexInstrumentor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor + + +def multiply(a: int, b: int) -> int: + """Multiply two numbers.""" + return a * b + + +def add(a: int, b: int) -> int: + """Add two numbers.""" + return a + b + + +def setup_telemetry(): + """Setup OpenTelemetry with console exporter.""" + trace.set_tracer_provider(TracerProvider()) + tracer_provider = trace.get_tracer_provider() + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + return tracer_provider + + +async def test_workflow_agent(): + """Test Workflow-based agent instrumentation.""" + + print("=" * 80) + print("Setting up telemetry...") + print("=" * 80) + tracer_provider = setup_telemetry() + + # Setup LlamaIndex + Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0.1) + + # Instrument + print("\n" + "=" * 80) + print("Instrumenting LlamaIndex...") + print("=" * 80) + instrumentor = LlamaindexInstrumentor() + instrumentor.instrument(tracer_provider=tracer_provider) + + # Create tools + multiply_tool = FunctionTool.from_defaults(fn=multiply) + add_tool = FunctionTool.from_defaults(fn=add) + + print("\n" + "=" * 80) + print("Creating Workflow-based ReActAgent...") + print("=" * 80) + agent = ReActAgent(tools=[multiply_tool, add_tool], llm=Settings.llm, verbose=True) + + print("\n" + "=" * 80) + print("Running agent task (should see AgentInvocation -> ToolCall spans)...") + print("=" * 80) + + handler = agent.run(user_msg="Calculate 5 times 3, then add 2 to the result") + result = await handler + + # Give background instrumentation task time to complete + await asyncio.sleep(0.5) + + print("\n" + "=" * 80) + print("RESULTS") + print("=" * 80) + print(f"Response: {result.response.content}") + + print("\n" + "=" * 80) + print("✓ Test completed!") + print("=" * 80) + print("\nExpected trace structure:") + print(" AgentInvocation (gen_ai.agent.name=agent.Agent)") + print(" ├─ LLMInvocation") + print(" ├─ ToolCall (gen_ai.tool.name=multiply)") + print(" ├─ ToolCall (gen_ai.tool.name=add)") + print(" └─ LLMInvocation (final answer)") + print("=" * 80) + + +if __name__ == "__main__": + asyncio.run(test_workflow_agent())