Skip to content
18 changes: 18 additions & 0 deletions enterprise/litellm_enterprise/integrations/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ def __init__(
self.get_labels_for_metric("litellm_deployment_failed_fallbacks"),
)

# Callback Logging Failure Metrics
self.litellm_callback_logging_failures_metric = self._counter_factory(
name="litellm_callback_logging_failures_metric",
documentation="Total number of failures when emitting logs to callbacks (e.g. s3_v2, langfuse, etc)",
labelnames=["callback_name"],
)

self.litellm_llm_api_failed_requests_metric = self._counter_factory(
name="litellm_llm_api_failed_requests_metric",
documentation="deprecated - use litellm_proxy_failed_requests_metric",
Expand Down Expand Up @@ -1723,6 +1730,17 @@ def increment_deployment_cooled_down(
litellm_model_name, model_id, api_base, api_provider, exception_status
).inc()

def increment_callback_logging_failure(
self,
callback_name: str,
):
"""
Increment metric when logging to a callback fails (e.g., s3_v2, langfuse, etc.)
"""
self.litellm_callback_logging_failures_metric.labels(
callback_name=callback_name
).inc()

def track_provider_remaining_budget(
self, provider: str, spend: float, budget_limit: float
):
Expand Down
30 changes: 27 additions & 3 deletions litellm/containers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
from litellm.types.utils import CallTypes
from litellm.utils import ProviderConfigManager, client

# Default model for container operations - can be any provider that supports containers
DEFAULT_CONTAINER_ENDPOINT_MODEL = "openai/gpt-4"

__all__ = [
"acreate_container",
"adelete_container",
Expand Down Expand Up @@ -386,6 +383,15 @@ def list_containers(
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id")
_is_async = kwargs.pop("async_call", False) is True

# Check for mock response first
mock_response = kwargs.get("mock_response")
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)

response = ContainerListResponse(**mock_response)
return response

# get llm provider logic
litellm_params = GenericLiteLLMParams(**kwargs)
# get provider config
Expand Down Expand Up @@ -562,6 +568,15 @@ def retrieve_container(
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id")
_is_async = kwargs.pop("async_call", False) is True

# Check for mock response first
mock_response = kwargs.get("mock_response")
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)

response = ContainerObject(**mock_response)
return response

# get llm provider logic
litellm_params = GenericLiteLLMParams(**kwargs)
# get provider config
Expand Down Expand Up @@ -730,6 +745,15 @@ def delete_container(
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id")
_is_async = kwargs.pop("async_call", False) is True

# Check for mock response first
mock_response = kwargs.get("mock_response")
if mock_response is not None:
if isinstance(mock_response, str):
mock_response = json.loads(mock_response)

response = DeleteContainerResult(**mock_response)
return response

# get llm provider logic
litellm_params = GenericLiteLLMParams(**kwargs)
# get provider config
Expand Down
23 changes: 23 additions & 0 deletions litellm/integrations/custom_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,26 @@ async def get_proxy_server_request_from_cold_storage_with_object_key(
Get the proxy server request from cold storage using the object key directly.
"""
pass

def handle_callback_failure(self, callback_name: str):
"""
Handle callback logging failures by incrementing Prometheus metrics.

Call this method in exception handlers within your callback when logging fails.
"""
try:
import litellm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use logging_callback_manager to get you all the callbacks and then just run this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed @Sameerlite can you please make the changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code

all_callbacks = []
all_callbacks.extend(litellm.callbacks or []) # type: ignore
all_callbacks.extend(litellm._async_success_callback or []) # type: ignore
all_callbacks.extend(litellm.success_callback or []) # type: ignore

for callback_obj in all_callbacks:
if hasattr(callback_obj, 'increment_callback_logging_failure'):
callback_obj.increment_callback_logging_failure(callback_name=callback_name) # type: ignore
break

except Exception as e:
from litellm._logging import verbose_logger
verbose_logger.debug(f"Error in handle_callback_failure: {str(e)}")
4 changes: 3 additions & 1 deletion litellm/integrations/s3_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async def _async_log_event_base(self, kwargs, response_obj, start_time, end_time
)
except Exception as e:
verbose_logger.exception(f"s3 Layer Error - {str(e)}")
pass
self.handle_callback_failure(callback_name="S3Logger")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would be less work over time:

  • requiring each instance to implement this
  • OR having integrations just bubble the error and have litellm_logging handle this?
    @Sameerlite

Copy link
Collaborator Author

@Sameerlite Sameerlite Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krrishdholakia 2nd one is less work but the problem is periodic_flush and all method used in it don't raise error or propagate it to litellm_logging. Plus there are tasks which are fire-and-forget which I wasn't able find a way to bubble up those errors. The method I used was making sure that if an error comes, it will get logged in Prometheus

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krrishdholakia is this good with you ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's start here


async def async_upload_data_to_s3(
self, batch_logging_element: s3BatchLoggingElement
Expand Down Expand Up @@ -323,6 +323,7 @@ async def async_upload_data_to_s3(
response.raise_for_status()
except Exception as e:
verbose_logger.exception(f"Error uploading to s3: {str(e)}")
self.handle_callback_failure(callback_name="S3Logger")

async def async_send_batch(self):
"""
Expand Down Expand Up @@ -471,6 +472,7 @@ def upload_data_to_s3(self, batch_logging_element: s3BatchLoggingElement):
response.raise_for_status()
except Exception as e:
verbose_logger.exception(f"Error uploading to s3: {str(e)}")
self.handle_callback_failure(callback_name="S3Logger")

async def _download_object_from_s3(self, s3_object_key: str) -> Optional[dict]:
"""
Expand Down
46 changes: 45 additions & 1 deletion litellm/litellm_core_utils/litellm_logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# What is this?
## Common Utility file for Logging handler
# Logging function -> log the exact model details + what's being sent | Non-Blocking
import asyncio
import copy
import datetime
import json
Expand Down Expand Up @@ -116,6 +117,7 @@
Usage,
)
from litellm.types.videos.main import VideoObject
from litellm.types.containers.main import ContainerObject
from litellm.utils import _get_base_model_from_metadata, executor, print_verbose

from ..integrations.argilla import ArgillaLogger
Expand Down Expand Up @@ -1622,6 +1624,7 @@ def _is_recognized_call_type_for_logging(
or isinstance(logging_result, dict)
and logging_result.get("object") == "vector_store.search_results.page"
or isinstance(logging_result, VideoObject)
or isinstance(logging_result, ContainerObject)
or (self.call_type == CallTypes.call_mcp_tool.value)
):
return True
Expand Down Expand Up @@ -2157,6 +2160,11 @@ def success_handler( # noqa: PLR0915
)
if capture_exception: # log this error to sentry for debugging
capture_exception(e)
# Track callback logging failures in Prometheus
try:
self._handle_callback_failure(callback=callback)
except Exception:
pass
except Exception as e:
verbose_logger.exception(
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {}".format(
Expand Down Expand Up @@ -2462,8 +2470,36 @@ async def async_success_handler( # noqa: PLR0915
verbose_logger.error(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}"
)
self._handle_callback_failure(callback=callback)
pass

def _handle_callback_failure(self, callback: Any):
"""
Handle callback logging failures by incrementing Prometheus metrics.

Works for both sync and async contexts since Prometheus counter increment is synchronous.

Args:
callback: The callback that failed
"""
try:
callback_name = self._get_callback_name(callback)

all_callbacks = []
all_callbacks.extend(litellm.callbacks or []) # type: ignore
all_callbacks.extend(litellm._async_success_callback or []) # type: ignore
all_callbacks.extend(litellm.success_callback or []) # type: ignore

for callback_obj in all_callbacks:
if hasattr(callback_obj, 'increment_callback_logging_failure'):
callback_obj.increment_callback_logging_failure(callback_name=callback_name) # type: ignore
break # Only increment once

except Exception as e:
verbose_logger.debug(
f"Error in _handle_callback_failure: {str(e)}"
)

def _failure_handler_helper_fn(
self, exception, traceback_exception, start_time=None, end_time=None
):
Expand Down Expand Up @@ -2799,6 +2835,10 @@ async def async_failure_handler(
str(e), callback
)
)
# Track callback logging failures in Prometheus
asyncio.create_task(
self._handle_callback_failure(callback=callback)
)

def _get_trace_id(self, service_name: Literal["langfuse"]) -> Optional[str]:
"""
Expand Down Expand Up @@ -2931,11 +2971,15 @@ def _get_callback_name(self, cb) -> str:
Helper to get the name of a callback function

Args:
cb: The callback function/string to get the name of
cb: The callback object/function/string to get the name of

Returns:
The name of the callback
"""
if isinstance(cb, str):
return cb
if hasattr(cb, "__class__"):
return cb.__class__.__name__
if hasattr(cb, "__name__"):
return cb.__name__
if hasattr(cb, "__func__"):
Expand Down
2 changes: 1 addition & 1 deletion litellm/llms/custom_httpx/llm_http_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6377,4 +6377,4 @@ async def async_text_to_speech_handler(
model=model,
raw_response=response,
logging_obj=logging_obj,
)
)
8 changes: 8 additions & 0 deletions litellm/proxy/common_request_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ async def common_processing_pre_call_logic(
"avideo_status",
"avideo_content",
"avideo_remix",
"acreate_container",
"alist_containers",
"aretrieve_container",
"adelete_container",
],
version: Optional[str] = None,
user_model: Optional[str] = None,
Expand Down Expand Up @@ -419,6 +423,10 @@ async def base_process_llm_request(
"avideo_status",
"avideo_content",
"avideo_remix",
"acreate_container",
"alist_containers",
"aretrieve_container",
"adelete_container",
],
proxy_logging_obj: ProxyLogging,
general_settings: dict,
Expand Down
Empty file.
Loading
Loading