-
Notifications
You must be signed in to change notification settings - Fork 499
Description
Create Custom LiteLLM Callback Handler Maintained by AgentOps
Overview
Currently, AgentOps integrates with LiteLLM using LiteLLM's built-in "agentops" callback via litellm.success_callback = ["agentops"]. While this provides basic integration, it limits AgentOps' control over data collection, processing, and the observability experience.
This issue proposes creating a custom LiteLLM callback handler maintained by the AgentOps team, similar to our existing LangChain callback handler, to provide richer observability and better control over the integration.
Technical Requirements
LiteLLM Callback Interface
Based on LiteLLM's custom callback documentation, the callback handler must inherit from litellm.integrations.custom_logger.CustomLogger and implement the following methods:
class AgentOpsLiteLLMHandler(CustomLogger):
def log_pre_api_call(self, model, messages, kwargs):
"""Called before API call is made"""
pass
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
"""Called after API call completes"""
pass
def log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Called on successful API call"""
pass
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""Called on failed API call"""
pass
# Async versions
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Async version of log_success_event"""
pass
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""Async version of log_failure_event"""
passData Recording Requirements
The AgentOps LiteLLM callback handler should record the following data:
LLM Request Data
- Model Information: Model name, provider, version
- Request Parameters: Temperature, max_tokens, top_p, frequency_penalty, presence_penalty
- Messages/Prompts: Input messages with roles (system, user, assistant)
- Streaming: Whether request uses streaming
- Function Calls: Tool/function call definitions and parameters
LLM Response Data
- Completions: Generated text responses
- Token Usage: Prompt tokens, completion tokens, total tokens
- Model Response: Actual model used (may differ from requested)
- Finish Reason: stop, length, function_call, etc.
- Response Metadata: Response ID, created timestamp
Timing and Performance
- Start Time: Request initiation timestamp
- End Time: Response completion timestamp
- Duration: Total request duration
- Streaming Tokens: Individual token timing for streaming responses
Error Handling
- Error Type: API errors, network errors, timeout errors
- Error Message: Detailed error description
- Status Code: HTTP status codes
- Retry Information: Retry attempts and backoff
Integration with AgentOps Tracing
The callback handler should integrate with AgentOps' OpenTelemetry-based tracing system:
- Span Creation: Create spans using
agentops.sdk.core.tracer - Span Attributes: Use
agentops.semconv.SpanAttributesfor standardized attributes - Parent-Child Relationships: Maintain proper span hierarchy
- Session Management: Integrate with AgentOps session/trace context
Reference Implementation Pattern
Follow the pattern established in our LangChain callback handler:
- Use OpenTelemetry spans for tracing
- Implement proper error handling and logging
- Support both sync and async operations
- Maintain span context and parent-child relationships
- Use AgentOps semantic conventions for attributes
Boilerplate Implementation
Basic Handler Structure
"""
LiteLLM callback handler for AgentOps.
This module provides the LiteLLM callback handler for AgentOps tracing and monitoring.
"""
from typing import Any, Dict, List, Optional, Union
import time
from opentelemetry import trace
from opentelemetry.context import attach, detach
from opentelemetry.trace import SpanContext, set_span_in_context
from agentops.helpers.serialization import safe_serialize
from agentops.logging import logger
from agentops.sdk.core import tracer
from agentops.semconv import SpanKind, SpanAttributes, CoreAttributes
try:
from litellm.integrations.custom_logger import CustomLogger
except ImportError:
logger.warning("LiteLLM not installed. LiteLLM callback handler will not be available.")
CustomLogger = object
class AgentOpsLiteLLMHandler(CustomLogger):
"""
AgentOps callback handler for LiteLLM.
This handler creates spans for LLM calls made through LiteLLM,
maintaining proper parent-child relationships with session as root span.
Args:
api_key (str, optional): AgentOps API key
tags (List[str], optional): Tags to add to the session
auto_session (bool, optional): Whether to automatically create a session span
"""
def __init__(
self,
api_key: Optional[str] = None,
tags: Optional[List[str]] = None,
auto_session: bool = True,
):
"""Initialize the callback handler."""
super().__init__()
self.active_spans = {}
self.api_key = api_key
self.tags = tags or []
self.session_span = None
self.session_token = None
self.context_tokens = {} # Store context tokens by request_id
# Initialize AgentOps
if auto_session:
self._initialize_agentops()
def _initialize_agentops(self):
"""Initialize AgentOps and create session span."""
import agentops
if not tracer.initialized:
init_kwargs = {
"auto_start_session": False,
"instrument_llm_calls": True,
}
if self.api_key:
init_kwargs["api_key"] = self.api_key
agentops.init(**init_kwargs)
logger.debug("AgentOps initialized from LiteLLM callback handler")
if not tracer.initialized:
logger.warning("AgentOps not initialized, session span will not be created")
return
otel_tracer = tracer.get_tracer()
span_name = f"session.{SpanKind.SESSION}"
attributes = {
SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.SESSION,
"session.tags": self.tags,
"agentops.operation.name": "session",
"span.kind": SpanKind.SESSION,
}
# Create a root session span
self.session_span = otel_tracer.start_span(span_name, attributes=attributes)
# Attach session span to the current context
self.session_token = attach(set_span_in_context(self.session_span))
logger.debug("Created session span as root span for LiteLLM")
def _create_span(
self,
operation_name: str,
span_kind: str,
request_id: str,
attributes: Optional[Dict[str, Any]] = None,
):
"""Create a span for the LLM operation."""
if not tracer.initialized:
logger.warning("AgentOps not initialized, spans will not be created")
return trace.NonRecordingSpan(SpanContext.INVALID)
otel_tracer = tracer.get_tracer()
span_name = f"{operation_name}.{span_kind}"
if attributes is None:
attributes = {}
attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = span_kind
attributes["agentops.operation.name"] = operation_name
# Use session as parent context
parent_ctx = set_span_in_context(self.session_span)
span = otel_tracer.start_span(span_name, context=parent_ctx, attributes=attributes)
# Store span in active_spans
self.active_spans[request_id] = span
# Store token to detach later
token = attach(set_span_in_context(span))
self.context_tokens[request_id] = token
logger.debug(f"Started span: {span_name} for request: {request_id}")
return span
def _end_span(self, request_id: str):
"""End the span associated with the request_id."""
if request_id not in self.active_spans:
logger.warning(f"No span found for request {request_id}")
return
span = self.active_spans.pop(request_id)
token = self.context_tokens.pop(request_id, None)
if token is not None:
detach(token)
try:
span.end()
logger.debug(f"Ended span: {span.name}")
except Exception as e:
logger.warning(f"Error ending span: {e}")
def log_pre_api_call(self, model, messages, kwargs):
"""Called before API call is made."""
try:
request_id = kwargs.get("litellm_call_id", str(id(kwargs)))
# Extract model information
model_name = model or "unknown"
# Build attributes from request data
attributes = {
SpanAttributes.LLM_REQUEST_MODEL: model_name,
SpanAttributes.LLM_PROMPTS: safe_serialize(messages),
}
# Add request parameters
if "temperature" in kwargs:
attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"]
if "max_tokens" in kwargs:
attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"]
if "top_p" in kwargs:
attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"]
if "stream" in kwargs:
attributes["llm.request.stream"] = kwargs["stream"]
# Create span for this LLM call
self._create_span("llm", SpanKind.LLM, request_id, attributes)
logger.debug(f"Started LLM span for model: {model_name}")
except Exception as e:
logger.warning(f"Error in log_pre_api_call: {e}")
def log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Called on successful API call."""
try:
request_id = kwargs.get("litellm_call_id", str(id(kwargs)))
if request_id not in self.active_spans:
logger.warning(f"No span found for successful request {request_id}")
return
span = self.active_spans.get(request_id)
# Add response data to span
if hasattr(response_obj, "choices") and response_obj.choices:
completions = []
for choice in response_obj.choices:
if hasattr(choice, "message") and hasattr(choice.message, "content"):
completions.append(choice.message.content)
if completions:
span.set_attribute(SpanAttributes.LLM_COMPLETIONS, safe_serialize(completions))
# Add token usage information
if hasattr(response_obj, "usage"):
usage = response_obj.usage
if hasattr(usage, "completion_tokens"):
span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage.completion_tokens)
if hasattr(usage, "prompt_tokens"):
span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.prompt_tokens)
if hasattr(usage, "total_tokens"):
span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage.total_tokens)
# Add timing information
duration = end_time - start_time
span.set_attribute("llm.request.duration", duration)
# Add model information from response
if hasattr(response_obj, "model"):
span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, response_obj.model)
# End the span
self._end_span(request_id)
except Exception as e:
logger.warning(f"Error in log_success_event: {e}")
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""Called on failed API call."""
try:
request_id = kwargs.get("litellm_call_id", str(id(kwargs)))
if request_id not in self.active_spans:
logger.warning(f"No span found for failed request {request_id}")
return
span = self.active_spans.get(request_id)
# Add error information to span
if hasattr(response_obj, "error"):
span.set_attribute("llm.error.message", str(response_obj.error))
# Add timing information
duration = end_time - start_time
span.set_attribute("llm.request.duration", duration)
# Set span status to error
span.set_status(trace.Status(trace.StatusCode.ERROR, str(response_obj)))
# End the span
self._end_span(request_id)
except Exception as e:
logger.warning(f"Error in log_failure_event: {e}")
# Async versions
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Async version of log_success_event."""
# For now, delegate to sync version
# In the future, could implement async-specific logic
self.log_success_event(kwargs, response_obj, start_time, end_time)
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""Async version of log_failure_event."""
# For now, delegate to sync version
# In the future, could implement async-specific logic
self.log_failure_event(kwargs, response_obj, start_time, end_time)
def __del__(self):
"""Clean up resources when the handler is deleted."""
try:
# End any remaining spans
for request_id in list(self.active_spans.keys()):
try:
self._end_span(request_id)
except Exception as e:
logger.warning(f"Error ending span during cleanup: {e}")
# End session span and detach session token
if self.session_span:
try:
if hasattr(self, "session_token") and self.session_token:
detach(self.session_token)
self.session_span.end()
logger.debug("Ended session span")
except Exception as e:
logger.warning(f"Error ending session span: {e}")
except Exception as e:
logger.warning(f"Error in __del__: {e}")Usage Example
import litellm
import agentops
from agentops.integration.callbacks.litellm import AgentOpsLiteLLMHandler
# Initialize AgentOps
agentops.init()
# Create and register the callback handler
handler = AgentOpsLiteLLMHandler(
tags=["litellm-example", "custom-handler"]
)
# Register the handler with LiteLLM
litellm.callbacks = [handler]
# Make LLM calls - they will be automatically tracked
response = litellm.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello, how are you?"}]
)
print(response.choices[0].message.content)Implementation Location
The callback handler should be implemented at:
agentops/integration/callbacks/litellm/callback.pyagentops/integration/callbacks/litellm/__init__.py
Following the same structure as the existing LangChain callback handler.
Benefits
- Enhanced Control: Full control over data collection and processing
- Richer Observability: More detailed span attributes and metadata
- Better Error Handling: Comprehensive error tracking and reporting
- Streaming Support: Proper handling of streaming responses
- Consistency: Aligned with other AgentOps callback handlers
- Extensibility: Easy to add new features and improvements
Acceptance Criteria
- Implement
AgentOpsLiteLLMHandlerclass inheriting fromCustomLogger - Support both sync and async LLM calls
- Record all required LLM data (model, tokens, messages, timing, errors)
- Integrate with AgentOps OpenTelemetry tracing system
- Follow existing callback handler patterns and conventions
- Include comprehensive error handling and logging
- Add unit tests for the callback handler
- Update documentation with usage examples
- Ensure backward compatibility with existing LiteLLM integration
Related Issues
- Reference existing LangChain callback handler:
agentops/integration/callbacks/langchain/callback.py - LiteLLM documentation: https://docs.litellm.ai/docs/observability/custom_callback
- AgentOps LiteLLM integration docs: https://docs.agentops.ai/v2/integrations/litellm