Skip to content

Create Custom LiteLLM Callback Handler Maintained by AgentOps #1180

@devin-ai-integration

Description

@devin-ai-integration

Create Custom LiteLLM Callback Handler Maintained by AgentOps

Overview

Currently, AgentOps integrates with LiteLLM using LiteLLM's built-in "agentops" callback via litellm.success_callback = ["agentops"]. While this provides basic integration, it limits AgentOps' control over data collection, processing, and the observability experience.

This issue proposes creating a custom LiteLLM callback handler maintained by the AgentOps team, similar to our existing LangChain callback handler, to provide richer observability and better control over the integration.

Technical Requirements

LiteLLM Callback Interface

Based on LiteLLM's custom callback documentation, the callback handler must inherit from litellm.integrations.custom_logger.CustomLogger and implement the following methods:

class AgentOpsLiteLLMHandler(CustomLogger):
    def log_pre_api_call(self, model, messages, kwargs):
        """Called before API call is made"""
        pass
    
    def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
        """Called after API call completes"""
        pass
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        """Called on successful API call"""
        pass
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        """Called on failed API call"""
        pass
    
    # Async versions
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        """Async version of log_success_event"""
        pass
    
    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
        """Async version of log_failure_event"""
        pass

Data Recording Requirements

The AgentOps LiteLLM callback handler should record the following data:

LLM Request Data

  • Model Information: Model name, provider, version
  • Request Parameters: Temperature, max_tokens, top_p, frequency_penalty, presence_penalty
  • Messages/Prompts: Input messages with roles (system, user, assistant)
  • Streaming: Whether request uses streaming
  • Function Calls: Tool/function call definitions and parameters

LLM Response Data

  • Completions: Generated text responses
  • Token Usage: Prompt tokens, completion tokens, total tokens
  • Model Response: Actual model used (may differ from requested)
  • Finish Reason: stop, length, function_call, etc.
  • Response Metadata: Response ID, created timestamp

Timing and Performance

  • Start Time: Request initiation timestamp
  • End Time: Response completion timestamp
  • Duration: Total request duration
  • Streaming Tokens: Individual token timing for streaming responses

Error Handling

  • Error Type: API errors, network errors, timeout errors
  • Error Message: Detailed error description
  • Status Code: HTTP status codes
  • Retry Information: Retry attempts and backoff

Integration with AgentOps Tracing

The callback handler should integrate with AgentOps' OpenTelemetry-based tracing system:

  1. Span Creation: Create spans using agentops.sdk.core.tracer
  2. Span Attributes: Use agentops.semconv.SpanAttributes for standardized attributes
  3. Parent-Child Relationships: Maintain proper span hierarchy
  4. Session Management: Integrate with AgentOps session/trace context

Reference Implementation Pattern

Follow the pattern established in our LangChain callback handler:

  • Use OpenTelemetry spans for tracing
  • Implement proper error handling and logging
  • Support both sync and async operations
  • Maintain span context and parent-child relationships
  • Use AgentOps semantic conventions for attributes

Boilerplate Implementation

Basic Handler Structure

"""
LiteLLM callback handler for AgentOps.

This module provides the LiteLLM callback handler for AgentOps tracing and monitoring.
"""

from typing import Any, Dict, List, Optional, Union
import time

from opentelemetry import trace
from opentelemetry.context import attach, detach
from opentelemetry.trace import SpanContext, set_span_in_context

from agentops.helpers.serialization import safe_serialize
from agentops.logging import logger
from agentops.sdk.core import tracer
from agentops.semconv import SpanKind, SpanAttributes, CoreAttributes

try:
    from litellm.integrations.custom_logger import CustomLogger
except ImportError:
    logger.warning("LiteLLM not installed. LiteLLM callback handler will not be available.")
    CustomLogger = object


class AgentOpsLiteLLMHandler(CustomLogger):
    """
    AgentOps callback handler for LiteLLM.
    
    This handler creates spans for LLM calls made through LiteLLM,
    maintaining proper parent-child relationships with session as root span.
    
    Args:
        api_key (str, optional): AgentOps API key
        tags (List[str], optional): Tags to add to the session
        auto_session (bool, optional): Whether to automatically create a session span
    """
    
    def __init__(
        self,
        api_key: Optional[str] = None,
        tags: Optional[List[str]] = None,
        auto_session: bool = True,
    ):
        """Initialize the callback handler."""
        super().__init__()
        self.active_spans = {}
        self.api_key = api_key
        self.tags = tags or []
        self.session_span = None
        self.session_token = None
        self.context_tokens = {}  # Store context tokens by request_id
        
        # Initialize AgentOps
        if auto_session:
            self._initialize_agentops()
    
    def _initialize_agentops(self):
        """Initialize AgentOps and create session span."""
        import agentops
        
        if not tracer.initialized:
            init_kwargs = {
                "auto_start_session": False,
                "instrument_llm_calls": True,
            }
            
            if self.api_key:
                init_kwargs["api_key"] = self.api_key
            
            agentops.init(**init_kwargs)
            logger.debug("AgentOps initialized from LiteLLM callback handler")
        
        if not tracer.initialized:
            logger.warning("AgentOps not initialized, session span will not be created")
            return
        
        otel_tracer = tracer.get_tracer()
        
        span_name = f"session.{SpanKind.SESSION}"
        
        attributes = {
            SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.SESSION,
            "session.tags": self.tags,
            "agentops.operation.name": "session",
            "span.kind": SpanKind.SESSION,
        }
        
        # Create a root session span
        self.session_span = otel_tracer.start_span(span_name, attributes=attributes)
        
        # Attach session span to the current context
        self.session_token = attach(set_span_in_context(self.session_span))
        
        logger.debug("Created session span as root span for LiteLLM")
    
    def _create_span(
        self,
        operation_name: str,
        span_kind: str,
        request_id: str,
        attributes: Optional[Dict[str, Any]] = None,
    ):
        """Create a span for the LLM operation."""
        if not tracer.initialized:
            logger.warning("AgentOps not initialized, spans will not be created")
            return trace.NonRecordingSpan(SpanContext.INVALID)
        
        otel_tracer = tracer.get_tracer()
        
        span_name = f"{operation_name}.{span_kind}"
        
        if attributes is None:
            attributes = {}
        
        attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind
        attributes["agentops.operation.name"] = operation_name
        
        # Use session as parent context
        parent_ctx = set_span_in_context(self.session_span)
        span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes)
        
        # Store span in active_spans
        self.active_spans[request_id] = span
        
        # Store token to detach later
        token = attach(set_span_in_context(span))
        self.context_tokens[request_id] = token
        
        logger.debug(f"Started span: {span_name} for request: {request_id}")
        return span
    
    def _end_span(self, request_id: str):
        """End the span associated with the request_id."""
        if request_id not in self.active_spans:
            logger.warning(f"No span found for request {request_id}")
            return
        
        span = self.active_spans.pop(request_id)
        token = self.context_tokens.pop(request_id, None)
        
        if token is not None:
            detach(token)
        
        try:
            span.end()
            logger.debug(f"Ended span: {span.name}")
        except Exception as e:
            logger.warning(f"Error ending span: {e}")
    
    def log_pre_api_call(self, model, messages, kwargs):
        """Called before API call is made."""
        try:
            request_id = kwargs.get("litellm_call_id", str(id(kwargs)))
            
            # Extract model information
            model_name = model or "unknown"
            
            # Build attributes from request data
            attributes = {
                SpanAttributes.LLM_REQUEST_MODEL: model_name,
                SpanAttributes.LLM_PROMPTS: safe_serialize(messages),
            }
            
            # Add request parameters
            if "temperature" in kwargs:
                attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"]
            if "max_tokens" in kwargs:
                attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"]
            if "top_p" in kwargs:
                attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"]
            if "stream" in kwargs:
                attributes["llm.request.stream"] = kwargs["stream"]
            
            # Create span for this LLM call
            self._create_span("llm", SpanKind.LLM, request_id, attributes)
            
            logger.debug(f"Started LLM span for model: {model_name}")
            
        except Exception as e:
            logger.warning(f"Error in log_pre_api_call: {e}")
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        """Called on successful API call."""
        try:
            request_id = kwargs.get("litellm_call_id", str(id(kwargs)))
            
            if request_id not in self.active_spans:
                logger.warning(f"No span found for successful request {request_id}")
                return
            
            span = self.active_spans.get(request_id)
            
            # Add response data to span
            if hasattr(response_obj, "choices") and response_obj.choices:
                completions = []
                for choice in response_obj.choices:
                    if hasattr(choice, "message") and hasattr(choice.message, "content"):
                        completions.append(choice.message.content)
                
                if completions:
                    span.set_attribute(SpanAttributes.LLM_COMPLETIONS, safe_serialize(completions))
            
            # Add token usage information
            if hasattr(response_obj, "usage"):
                usage = response_obj.usage
                if hasattr(usage, "completion_tokens"):
                    span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage.completion_tokens)
                if hasattr(usage, "prompt_tokens"):
                    span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.prompt_tokens)
                if hasattr(usage, "total_tokens"):
                    span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage.total_tokens)
            
            # Add timing information
            duration = end_time - start_time
            span.set_attribute("llm.request.duration", duration)
            
            # Add model information from response
            if hasattr(response_obj, "model"):
                span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, response_obj.model)
            
            # End the span
            self._end_span(request_id)
            
        except Exception as e:
            logger.warning(f"Error in log_success_event: {e}")
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        """Called on failed API call."""
        try:
            request_id = kwargs.get("litellm_call_id", str(id(kwargs)))
            
            if request_id not in self.active_spans:
                logger.warning(f"No span found for failed request {request_id}")
                return
            
            span = self.active_spans.get(request_id)
            
            # Add error information to span
            if hasattr(response_obj, "error"):
                span.set_attribute("llm.error.message", str(response_obj.error))
            
            # Add timing information
            duration = end_time - start_time
            span.set_attribute("llm.request.duration", duration)
            
            # Set span status to error
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(response_obj)))
            
            # End the span
            self._end_span(request_id)
            
        except Exception as e:
            logger.warning(f"Error in log_failure_event: {e}")
    
    # Async versions
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        """Async version of log_success_event."""
        # For now, delegate to sync version
        # In the future, could implement async-specific logic
        self.log_success_event(kwargs, response_obj, start_time, end_time)
    
    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
        """Async version of log_failure_event."""
        # For now, delegate to sync version
        # In the future, could implement async-specific logic
        self.log_failure_event(kwargs, response_obj, start_time, end_time)
    
    def __del__(self):
        """Clean up resources when the handler is deleted."""
        try:
            # End any remaining spans
            for request_id in list(self.active_spans.keys()):
                try:
                    self._end_span(request_id)
                except Exception as e:
                    logger.warning(f"Error ending span during cleanup: {e}")
            
            # End session span and detach session token
            if self.session_span:
                try:
                    if hasattr(self, "session_token") and self.session_token:
                        detach(self.session_token)
                    
                    self.session_span.end()
                    logger.debug("Ended session span")
                except Exception as e:
                    logger.warning(f"Error ending session span: {e}")
        
        except Exception as e:
            logger.warning(f"Error in __del__: {e}")

Usage Example

import litellm
import agentops
from agentops.integration.callbacks.litellm import AgentOpsLiteLLMHandler

# Initialize AgentOps
agentops.init()

# Create and register the callback handler
handler = AgentOpsLiteLLMHandler(
    tags=["litellm-example", "custom-handler"]
)

# Register the handler with LiteLLM
litellm.callbacks = [handler]

# Make LLM calls - they will be automatically tracked
response = litellm.completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello, how are you?"}]
)

print(response.choices[0].message.content)

Implementation Location

The callback handler should be implemented at:

  • agentops/integration/callbacks/litellm/callback.py
  • agentops/integration/callbacks/litellm/__init__.py

Following the same structure as the existing LangChain callback handler.

Benefits

  1. Enhanced Control: Full control over data collection and processing
  2. Richer Observability: More detailed span attributes and metadata
  3. Better Error Handling: Comprehensive error tracking and reporting
  4. Streaming Support: Proper handling of streaming responses
  5. Consistency: Aligned with other AgentOps callback handlers
  6. Extensibility: Easy to add new features and improvements

Acceptance Criteria

  • Implement AgentOpsLiteLLMHandler class inheriting from CustomLogger
  • Support both sync and async LLM calls
  • Record all required LLM data (model, tokens, messages, timing, errors)
  • Integrate with AgentOps OpenTelemetry tracing system
  • Follow existing callback handler patterns and conventions
  • Include comprehensive error handling and logging
  • Add unit tests for the callback handler
  • Update documentation with usage examples
  • Ensure backward compatibility with existing LiteLLM integration

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions