Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions packages/traceloop-sdk/traceloop/sdk/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,79 @@ def _setup_span(entity_name, tlp_span_kind, version):
return span, ctx, ctx_token


def _sanitize_for_serialization(obj):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling circular references in _sanitize_for_serialization to prevent infinite recursion when processing cyclic data structures.

"""
Recursively sanitize objects for JSON serialization by replacing
unpicklable objects with string representations.
"""
import dataclasses
import copy

# Handle None
if obj is None:
return None

# Handle primitive types
if isinstance(obj, (str, int, float, bool)):
return obj

# Handle lists and tuples
if isinstance(obj, (list, tuple)):
return type(obj)(
_sanitize_for_serialization(item) for item in obj
)

# Handle dictionaries
if isinstance(obj, dict):
return {
key: _sanitize_for_serialization(value)
for key, value in obj.items()
}

# Handle dataclasses - try to convert, catch unpicklable objects
if dataclasses.is_dataclass(obj):
try:
# Try shallow copy first to test if it's picklable
copy.copy(obj)
# If successful, try to convert to dict
obj_dict = {}
for field in dataclasses.fields(obj):
field_value = getattr(obj, field.name)
# Check if the field value is picklable
try:
copy.deepcopy(field_value)
obj_dict[field.name] = _sanitize_for_serialization(
field_value
)
except (TypeError, ValueError, AttributeError):
# If not picklable, use string representation
obj_dict[field.name] = (
f"<unpicklable: {type(field_value).__name__}>"
)
return obj_dict
except (TypeError, ValueError, AttributeError):
return f"<unpicklable dataclass: {type(obj).__name__}>"

# For other objects, try to serialize them
try:
copy.deepcopy(obj)
# If deepcopy succeeds, return as-is for JSON encoder
return obj
except (TypeError, ValueError, AttributeError):
# If not picklable, return a string representation
return f"<unpicklable: {type(obj).__name__}>"


def _handle_span_input(span, args, kwargs, cls=None):
"""Handles entity input logging in JSON for both sync and async functions"""
try:
if _should_send_prompts():

# Create a sanitized copy of args and kwargs that excludes unpicklable objects
sanitized_args = _sanitize_for_serialization(args)
sanitized_kwargs = _sanitize_for_serialization(kwargs)
json_input = json.dumps(
{"args": args, "kwargs": kwargs}, **({"cls": cls} if cls else {})
{"args": sanitized_args, "kwargs": sanitized_kwargs}, **({"cls": cls} if cls else {})
)
truncated_json = _truncate_json_if_needed(json_input)
span.set_attribute(
Expand All @@ -192,13 +259,14 @@ def _handle_span_output(span, res, cls=None):
"""Handles entity output logging in JSON for both sync and async functions"""
try:
if _should_send_prompts():
json_output = json.dumps(res, **({"cls": cls} if cls else {}))
sanitized_res = _sanitize_for_serialization(res)
json_output = json.dumps(sanitized_res, **({"cls": cls} if cls else {}))
truncated_json = _truncate_json_if_needed(json_output)
span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
truncated_json,
)
except TypeError as e:
except (TypeError, ValueError) as e:
Telemetry().log_exception(e)


Expand Down
Loading