diff --git a/src/galileo/openai.py b/src/galileo/openai.py index 014987a8..646023cd 100644 --- a/src/galileo/openai.py +++ b/src/galileo/openai.py @@ -39,6 +39,7 @@ import logging import types +import json from collections import defaultdict from collections.abc import Generator, Iterable from dataclasses import dataclass @@ -50,7 +51,7 @@ from pydantic import BaseModel from wrapt import wrap_function_wrapper # type: ignore[import-untyped] -from galileo import GalileoLogger +from galileo import GalileoLogger, Message, MessageRole, ToolCall, ToolCallFunction from galileo.decorator import galileo_context from galileo.utils import _get_timestamp from galileo.utils.serialization import serialize_to_str @@ -103,7 +104,10 @@ class OpenAiInputData: OPENAI_CLIENT_METHODS = [ OpenAiModuleDefinition( module="openai.resources.chat.completions", object="Completions", method="create", type="chat", sync=True - ) + ), + OpenAiModuleDefinition( + module="openai.resources.responses", object="Responses", method="create", type="response", sync=True + ), # Eventually add more OpenAI client library methods here ] @@ -153,6 +157,75 @@ def wrapper(wrapped: Callable, instance: Any, args: dict, kwargs: dict) -> Any: return _with_galileo +def _convert_to_galileo_message(data: Any, default_role: str = "user") -> Message: + """Convert OpenAI response data to a Galileo Message object.""" + if hasattr(data, "type") and data.type == "function_call": + tool_call = ToolCall( + id=getattr(data, "call_id", ""), + function=ToolCallFunction(name=getattr(data, "name", ""), arguments=getattr(data, "arguments", "")), + ) + return Message(content="", role=MessageRole.assistant, tool_calls=[tool_call]) + + if isinstance(data, dict) and data.get("type") == "function_call_output": + output = data.get("output", "") + if isinstance(output, dict): + content = json.dumps(output) + else: + content = str(output) + + return Message(content=content, role=MessageRole.tool, tool_call_id=data.get("call_id", "")) + + # Handle ChatCompletionMessage objects (from completion API) and dictionary messages + if (hasattr(data, "role") and hasattr(data, "content")) or isinstance(data, dict): + # Extract role and content from either object type + if hasattr(data, "role"): + # ChatCompletionMessage object + role = getattr(data, "role", default_role) + content = getattr(data, "content", "") + tool_calls = getattr(data, "tool_calls", None) + tool_call_id = getattr(data, "tool_call_id", None) + else: + # Dictionary message + role = data.get("role", default_role) + content = data.get("content", "") + tool_calls = data.get("tool_calls") + tool_call_id = data.get("tool_call_id") + + # Handle tool calls if present + galileo_tool_calls = None + if tool_calls: + galileo_tool_calls = [] + for tc in tool_calls: + if hasattr(tc, "function"): + # ChatCompletionMessageFunctionToolCall object + galileo_tool_calls.append( + ToolCall( + id=getattr(tc, "id", ""), + function=ToolCallFunction( + name=getattr(tc.function, "name", ""), arguments=getattr(tc.function, "arguments", "") + ), + ) + ) + elif isinstance(tc, dict) and "function" in tc: + # Dictionary tool call + galileo_tool_calls.append( + ToolCall( + id=tc.get("id", ""), + function=ToolCallFunction( + name=tc["function"].get("name", ""), arguments=tc["function"].get("arguments", "") + ), + ) + ) + + return Message( + content=str(content) if content is not None else "", + role=MessageRole(role), + tool_calls=galileo_tool_calls, + tool_call_id=tool_call_id, + ) + return Message(content=str(data), role=MessageRole(default_role)) + + def _extract_chat_response(kwargs: dict) -> dict: """Extracts the llm output from the response.""" response = {"role": kwargs.get("role")} @@ -213,6 +286,10 @@ def _extract_input_data_from_kwargs( prompt = kwargs.get("prompt") elif resource.type == "chat": prompt = kwargs.get("messages", []) + elif resource.type == "response": + prompt = kwargs.get("input", "") + # TODO: parse instructions from kwargs + # https://platform.openai.com/docs/guides/text#message-roles-and-instruction-following parsed_temperature = float( kwargs.get("temperature", 1) if not isinstance(kwargs.get("temperature", 1), NotGiven) else 1 @@ -242,6 +319,11 @@ def _extract_input_data_from_kwargs( parsed_tool_choice = kwargs.get("tool_choice") if not isinstance(kwargs.get("tool_choice"), NotGiven) else None + # Extract reasoning parameters for Responses API + reasoning = kwargs.get("reasoning") if resource.type == "response" else None + parsed_reasoning_effort = reasoning.get("effort") if reasoning else None + parsed_reasoning_verbosity = reasoning.get("summary") if reasoning else None + parsed_reasoning_generate_summary = reasoning.get("generate_summary") if reasoning else None # handle deprecated aliases (functions for tools, function_call for tool_choice) if parsed_tools is None and kwargs.get("functions") is not None: parsed_tools = kwargs["functions"] @@ -263,6 +345,14 @@ def _extract_input_data_from_kwargs( if parsed_seed is not None: model_parameters["seed"] = parsed_seed + # Add reasoning parameters to model parameters + if parsed_reasoning_effort is not None: + model_parameters["reasoning_effort"] = parsed_reasoning_effort + if parsed_reasoning_verbosity is not None: + model_parameters["reasoning_verbosity"] = parsed_reasoning_verbosity + if parsed_reasoning_generate_summary is not None: + model_parameters["reasoning_generate_summary"] = parsed_reasoning_generate_summary + return OpenAiInputData( name=name, metadata=metadata, @@ -283,6 +373,17 @@ def _parse_usage(usage: Optional[dict] = None) -> Optional[dict]: usage_dict = usage.copy() if isinstance(usage, dict) else usage.__dict__ + # Handle Responses API field names (input_tokens/output_tokens) vs Chat Completions (prompt_tokens/completion_tokens) + if "input_tokens" in usage_dict: + usage_dict["prompt_tokens"] = usage_dict.pop("input_tokens") + if "output_tokens" in usage_dict: + usage_dict["completion_tokens"] = usage_dict.pop("output_tokens") + + if "input_tokens_details" in usage_dict: + usage_dict["prompt_tokens_details"] = usage_dict.pop("input_tokens_details") + if "output_tokens_details" in usage_dict: + usage_dict["completion_tokens_details"] = usage_dict.pop("output_tokens_details") + for tokens_details in ["prompt_tokens_details", "completion_tokens_details"]: if tokens_details in usage_dict and usage_dict[tokens_details] is not None: tokens_details_dict = ( @@ -295,6 +396,315 @@ def _parse_usage(usage: Optional[dict] = None) -> Optional[dict]: return usage_dict +def _extract_reasoning_content(item) -> str: + """Extract reasoning content from a reasoning item. Combines multiple summary items into a single string.""" + summary = getattr(item, "summary", []) + if isinstance(summary, list) and summary: + reasoning_texts = [] + for summary_item in summary: + if hasattr(summary_item, "text"): + reasoning_texts.append(summary_item.text) + elif isinstance(summary_item, dict) and "text" in summary_item: + reasoning_texts.append(summary_item["text"]) + return "\n\n".join(reasoning_texts) + return "" + +def _extract_message_content(item) -> str: + """Extract message content from a message item.""" + content = getattr(item, "content", []) + if isinstance(content, list): + text_parts = [] + for content_item in content: + if hasattr(content_item, "text"): + text_parts.append(content_item.text) + elif isinstance(content_item, dict) and "text" in content_item: + text_parts.append(content_item["text"]) + return "".join(text_parts) + return str(content) if content else "" + +def _process_output_items(output_items: list, galileo_logger, model: str = None, original_input: list = None, model_parameters: dict = None) -> list: + """ + The responses API returns an array of output items. This function processes output items sequentially, + consolidating reasoning into the main response content and creating tool spans for specific tool call types. + """ + conversation_context = original_input.copy() if original_input else [] + + # Collect all reasoning content to include in the final response + all_reasoning_content = [] + final_message_content = "" + final_tool_calls = [] + + # Tool call types that should create separate tool spans + # https://platform.openai.com/docs/guides/tools + tool_span_types = { + "mcp_call", "mcp_list_tools", "web_search_call", "file_search_call", + "computer_call", "image_generation_call", "code_interpreter_call", + "local_shell_call", "custom_tool_call" + } + + # loop through output items to construct tool calls, messages, and reasoning + for item in output_items: + if hasattr(item, "type") and item.type == "reasoning": + # Extract reasoning content but don't create separate spans + reasoning_content = _extract_reasoning_content(item) + if reasoning_content: + all_reasoning_content.append(reasoning_content) + + elif hasattr(item, "type") and item.type == "message": + # Extract message content + message_content = _extract_message_content(item) + final_message_content = message_content + + elif hasattr(item, "type") and item.type == "function_call": + # Collect regular function calls (not tool spans) + tool_call = { + "id": getattr(item, "id", ""), + "function": {"name": getattr(item, "name", ""), "arguments": getattr(item, "arguments", "")}, + "type": "function", + } + final_tool_calls.append(tool_call) + + # Create consolidated output with reasoning as separate Messages + consolidated_output_messages = [] + + # Add reasoning as special reasoning objects (not Message objects) + if all_reasoning_content: + for i, reasoning_text in enumerate(all_reasoning_content): + # Create a special reasoning object instead of a Message + reasoning_obj = { + "type": "reasoning", + "content": reasoning_text, + } + consolidated_output_messages.append(reasoning_obj) + + # Add the final response message + if final_message_content: + response_message = _convert_to_galileo_message(final_message_content, "assistant") + consolidated_output_messages.append(response_message) + + # Create the final consolidated output for the LLM span with content and reasoning + if consolidated_output_messages or final_tool_calls: + # WORKAROUND: Serialize the array of Messages and reasoning objects into a string since LLM span output + # doesn't support lis of Messages. This is a temporary solution until better responses support is added. + serialized_items = [] + for item in consolidated_output_messages: + if isinstance(item, dict) and item.get("type") == "reasoning": + # Keep reasoning objects as-is + serialized_items.append(item) + else: + # Convert Message objects to dict + serialized_items.append(item.model_dump(exclude_none=True)) + + messages_serialized = json.dumps(serialized_items, indent=2) + + # Create a single Message with the serialized array as content + consolidated_output = _convert_to_galileo_message(messages_serialized, "assistant") + + # Add tool calls if present + if final_tool_calls: + consolidated_output.tool_calls = [ + ToolCall( + id=tc["id"], + function=ToolCallFunction(name=tc["function"]["name"], arguments=tc["function"]["arguments"]) + ) for tc in final_tool_calls + ] + + # Create single consolidated span with serialized messages + galileo_logger.add_llm_span( + input=conversation_context, + output=consolidated_output, + model=model, + name="response", + metadata={ + "type": "consolidated_response", + "includes_reasoning": str(bool(all_reasoning_content)), + "reasoning_count": str(len(all_reasoning_content)), + "serialized_messages": "true", # Flag to indicate this contains serialized messages + **{str(k): str(v) for k, v in (model_parameters or {}).items()} + } + ) + + # Update conversation context with only Message objects (not reasoning objects) + for item in consolidated_output_messages: + if not (isinstance(item, dict) and item.get("type") == "reasoning"): + conversation_context.append(item) + + # Process tool calls that should create separate tool spans + # TODO: These should be child tool call spans once the child span functionality is fixed + for item in output_items: + if hasattr(item, "type") and item.type in tool_span_types: + # Extract tool call data based on type + tool_id = getattr(item, "id", "") + tool_status = getattr(item, "status", "") + + # Extract type-specific data + if item.type == "web_search_call": + # https://platform.openai.com/docs/api-reference/responses/object#responses/object-output-web-search-tool-call + # Extract web search action details - query/type as input, sources as output + action = getattr(item, "action", None) + if action: + # Input: grab relevant keys if they exist + input_data = {"type": getattr(action, "type", "")} + if hasattr(action, "query"): + input_data["query"] = getattr(action, "query", "") + if hasattr(action, "url"): + input_data["url"] = getattr(action, "url", "") + if hasattr(action, "pattern"): + input_data["pattern"] = getattr(action, "pattern", "") + tool_input = json.dumps(input_data, indent=2) + + # Output: grab relevant keys if they exist + output_data = {} + if hasattr(action, "sources") and getattr(action, "sources"): + sources = getattr(action, "sources", []) + output_data["sources"] = [ + source.__dict__ if hasattr(source, '__dict__') else str(source) + for source in sources + ] + if not output_data: + output_data = {"status": tool_status} + tool_output = json.dumps(output_data, indent=2) + else: + tool_input = json.dumps({}, indent=2) + tool_output = f"Status: {tool_status}" + + elif item.type == "mcp_call": + # Extract MCP call details - send raw JSON + tool_input = json.dumps({ + "name": getattr(item, "name", ""), + "server_label": getattr(item, "server_label", ""), + "arguments": getattr(item, "arguments", ""), + }, indent=2) + tool_output = json.dumps({ + "status": tool_status, + "output": getattr(item, 'output', '') + }, indent=2) + + elif item.type == "mcp_list_tools": + # Extract listed tools - server as input, tools as output + tool_input = getattr(item, "server_label", "") + tool_output = json.dumps([tool.__dict__ if hasattr(tool, "__dict__") else tool for tool in getattr(item, "tools", [])], indent=2) + + elif item.type == "file_search_call": + # https://platform.openai.com/docs/api-reference/responses/object#responses/object-output-file-search-tool-call + # Extract file search details - send raw JSON + tool_input = json.dumps({ + "queries": getattr(item, "queries", []), + "results": [result.__dict__ if hasattr(result, "__dict__") else result for result in getattr(item, "results", [])] + }, indent=2) + tool_output = f"Status: {tool_status}" + + elif item.type == "computer_call": + # https://platform.openai.com/docs/api-reference/responses/object#responses/object-output-computer-tool-call + # Extract computer call action details - send raw JSON + action = getattr(item, "action", None) + tool_input = json.dumps(action.__dict__ if action else {}, indent=2) + tool_output = f"Status: {tool_status}" + + elif item.type == "image_generation_call": + # https://platform.openai.com/docs/api-reference/responses/object#responses/object-output-image-generation-call + # Extract image generation details - send raw JSON + tool_input = json.dumps({ + "id": getattr(item, "id", ""), + "status": getattr(item, "status", ""), + }, indent=2) + tool_output = getattr(item, 'result', '') + + elif item.type == "code_interpreter_call": + # https://platform.openai.com/docs/api-reference/responses/object#responses/object-output-code-interpreter-tool-call + # Extract code interpreter details - send raw JSON + tool_input = json.dumps({ + "id": getattr(item, "id", ""), + "code": getattr(item, "code", ""), + "container_id": getattr(item, "container_id", ""), + "status": getattr(item, "status", ""), + }, indent=2) + tool_output = json.dumps(getattr(item, "outputs", []), indent=2) + + elif item.type == "local_shell_call": + # https://platform.openai.com/docs/api-reference/responses/object#responses/object-output-local-shell-call + # Extract local shell call details - send raw JSON + action = getattr(item, "action", None) + tool_input = json.dumps({ + "id": getattr(item, "id", ""), + "call_id": getattr(item, "call_id", ""), + "status": getattr(item, "status", ""), + "action": action.__dict__ if action else {}, + }, indent=2) + tool_output = f"Status: {tool_status}" + + elif item.type == "custom_tool_call": + # Extract custom tool details - send raw JSON + tool_input = json.dumps({ + "name": getattr(item, "name", ""), + "arguments": getattr(item, "arguments", ""), + }, indent=2) + tool_output = json.dumps({ + "status": tool_status, + "output": getattr(item, 'output', '') + }, indent=2) + else: + # Fallback for unknown tool types - send raw JSON + tool_input = json.dumps({ + "name": getattr(item, "name", ""), + "arguments": getattr(item, "arguments", ""), + }, indent=2) + tool_output = f"Status: {tool_status}\nOutput: {getattr(item, 'output', '')}" + + # Create tool span with the tool type as the name + galileo_logger.add_tool_span( + input=tool_input, + output=tool_output, + name=item.type, # Use the tool type as the span name + metadata={ + "tool_id": tool_id, + "tool_type": item.type, + "tool_status": tool_status, + **{str(k): str(v) for k, v in (model_parameters or {}).items()} + } + ) + + return conversation_context + + +def _extract_responses_output(output_items: list) -> dict: + """Extract the final message and tool calls from Responses API output items.""" + final_message = None + tool_calls = [] + + for item in output_items: + if hasattr(item, "type") and item.type == "message": + final_message = {"role": getattr(item, "role", "assistant"), "content": ""} + + content = getattr(item, "content", []) + if isinstance(content, list): + text_parts = [] + for content_item in content: + if hasattr(content_item, "text"): + text_parts.append(content_item.text) + elif isinstance(content_item, dict) and "text" in content_item: + text_parts.append(content_item["text"]) + final_message["content"] = "".join(text_parts) + else: + final_message["content"] = str(content) + + elif hasattr(item, "type") and item.type == "function_call": + tool_call = { + "id": getattr(item, "id", ""), + "function": {"name": getattr(item, "name", ""), "arguments": getattr(item, "arguments", "")}, + "type": "function", + } + tool_calls.append(tool_call) + + if final_message: + if tool_calls: + final_message["tool_calls"] = tool_calls + return final_message + if tool_calls: + return {"role": "assistant", "tool_calls": tool_calls} + return {"role": "assistant", "content": ""} + + def _extract_data_from_default_response(resource: OpenAiModuleDefinition, response: dict[str, Any]) -> Any: if response is None: return None, "", None @@ -325,6 +735,10 @@ def _extract_data_from_default_response(resource: OpenAiModuleDefinition, respon completion = ( _extract_chat_response(choice.message.__dict__) if _is_openai_v1() else choice.get("message", None) ) + elif resource.type == "response": + # Handle Responses API structure + output = response.get("output", []) + completion = _extract_responses_output(output) usage = _parse_usage(response.get("usage")) @@ -335,10 +749,27 @@ def _extract_streamed_openai_response(resource: OpenAiModuleDefinition, chunks: completion = defaultdict(str) if resource.type == "chat" else "" model, usage = None, None + # For Responses API, we just need to find the final completed event + if resource.type == "response": + final_response = None + for chunk in chunks: if _is_openai_v1(): chunk = chunk.__dict__ + if resource.type == "response": + chunk_type = chunk.get("type", "") + + if chunk_type == "response.completed": + final_response = chunk.get("response") + if final_response: + model = getattr(final_response, "model", None) + usage_obj = getattr(final_response, "usage", None) + if usage_obj: + usage = _parse_usage(usage_obj.__dict__ if hasattr(usage_obj, "__dict__") else usage_obj) + + continue + model = model or chunk.get("model", None) or None usage = chunk.get("usage", None) @@ -414,7 +845,23 @@ def get_response_for_chat() -> Any: or None ) - return model, get_response_for_chat() if resource.type == "chat" else completion, usage + if resource.type == "chat": + return model, get_response_for_chat(), usage + if resource.type == "response": + if final_response: + output_items = getattr(final_response, "output", []) + # Since we get a response object back, we can use the same function to extract the output + response_message = _extract_responses_output(output_items) + # Return the full response structure so streaming can access output_items + full_response = { + "role": "assistant", + "content": response_message.get("content", ""), + "tool_calls": response_message.get("tool_calls"), + "output": output_items # Include output items for processing + } + return model, full_response, usage + return model, {"role": "assistant", "content": ""}, usage + return model, completion, usage def _is_openai_v1() -> bool: @@ -442,7 +889,15 @@ def _wrap( else: # If we don't have an active trace, start a new trace # We will conclude it at the end - galileo_logger.start_trace(input=serialize_to_str(input_data.input), name=input_data.name) + # convert to list of galileo messages since we can't send list of messages to span and want consistency + if isinstance(input_data.input, list): + trace_input_messages = [_convert_to_galileo_message(msg) for msg in input_data.input] + else: + trace_input_messages = [_convert_to_galileo_message(input_data.input)] + + # Serialize with "messages" wrapper for UI compatibility + trace_input = {"messages": [msg.model_dump(exclude_none=True) for msg in trace_input_messages]} + galileo_logger.start_trace(input=serialize_to_str(trace_input), name=input_data.name) should_complete_trace = True try: @@ -476,28 +931,66 @@ def _wrap( duration_ns = round((end_time - start_time).total_seconds() * 1e9) - # Add a span to the current trace or span (if this is a nested trace) - galileo_logger.add_llm_span( - input=input_data.input, - output=completion, - tools=input_data.tools, - name=input_data.name, - model=model, - temperature=input_data.temperature, - duration_ns=duration_ns, - num_input_tokens=usage.get("prompt_tokens", 0), - num_output_tokens=usage.get("completion_tokens", 0), - total_tokens=usage.get("total_tokens", 0), - metadata={str(k): str(v) for k, v in input_data.model_parameters.items()}, - # openai client library doesn't return http_status code, so we only can hardcode it here - # because we if we parsed and extracted data from response it means we get it and it's 200OK - status_code=status_code, - ) + # convert to list of galileo messages since we can't send a regular list to span input + if isinstance(input_data.input, list): + span_input = [_convert_to_galileo_message(msg) for msg in input_data.input] + else: + span_input = [_convert_to_galileo_message(input_data.input)] + + # Process Responses API output items sequentially if present + final_conversation_context = span_input.copy() + if open_ai_resource.type == "response" and openai_response: + response_dict = openai_response.__dict__ if _is_openai_v1() else openai_response + output_items = response_dict.get("output", []) + + # Process all output items sequentially and get the final context + final_conversation_context = _process_output_items( + output_items, + galileo_logger, + model, + span_input, + input_data.model_parameters + ) + else: + # For non-Responses API (chat or completion), create the main span as before + span_output = _convert_to_galileo_message(completion, "assistant") + + # Add a span to the current trace or span (if this is a nested trace) + galileo_logger.add_llm_span( + input=span_input, + output=span_output, + tools=input_data.tools, + name=input_data.name, + model=model, + temperature=input_data.temperature, + duration_ns=duration_ns, + num_input_tokens=usage.get("prompt_tokens", 0), + num_output_tokens=usage.get("completion_tokens", 0), + total_tokens=usage.get("total_tokens", 0), + metadata={str(k): str(v) for k, v in input_data.model_parameters.items()}, + # openai client library doesn't return http_status code, so we only can hardcode it here + # because we if we parsed and extracted data from response it means we get it and it's 200OK + status_code=status_code, + ) # Conclude the trace if this is the top-level call if should_complete_trace: + if open_ai_resource.type == "response": + # For Responses API, use the final conversation context from processing + full_conversation = final_conversation_context + else: + # For other APIs, add the final span output + full_conversation = [] + if isinstance(input_data.input, list): + full_conversation.extend([_convert_to_galileo_message(msg) for msg in input_data.input]) + else: + full_conversation.append(_convert_to_galileo_message(input_data.input)) + full_conversation.append(span_output) + + # Serialize with "messages" wrapper for UI compatibility + trace_output = {"messages": [msg.model_dump(exclude_none=True) for msg in full_conversation]} galileo_logger.conclude( - output=serialize_to_str(completion), duration_ns=duration_ns, status_code=status_code + output=serialize_to_str(trace_output), duration_ns=duration_ns, status_code=status_code ) # we want to re-raise exception after we process openai_response @@ -593,25 +1086,84 @@ def _finalize(self) -> None: # TODO: make sure completion_start_time what we want duration_ns = round((end_time - self.completion_start_time).total_seconds() * 1e9) - # Add a span to the current trace or span (if this is a nested trace) - self.logger.add_llm_span( - input=self.input_data.input, - output=completion, - tools=self.input_data.tools, - name=self.input_data.name, - model=model, - temperature=self.input_data.temperature, - duration_ns=duration_ns, - num_input_tokens=usage.get("prompt_tokens", 0), - num_output_tokens=usage.get("completion_tokens", 0), - total_tokens=usage.get("total_tokens", 0), - metadata={str(k): str(v) for k, v in self.input_data.model_parameters.items()}, - status_code=self.status_code, - ) + if isinstance(self.input_data.input, list): + span_input = [_convert_to_galileo_message(msg) for msg in self.input_data.input] + else: + span_input = [_convert_to_galileo_message(self.input_data.input)] + + # probably can create a shared function for handling both streaming and non-streaming + # Process Responses API output items sequentially if present (same as non-streaming) + final_conversation_context = span_input.copy() + if self.resource.type == "response" and completion: + # For streaming Responses API, we need to extract output items from the completion + # The completion should contain the final response with output items + if isinstance(completion, dict) and "output" in completion: + output_items = completion.get("output", []) + + # Process all output items sequentially and get the final context + final_conversation_context = _process_output_items( + output_items, + self.logger, + model, + span_input, + self.input_data.model_parameters + ) + else: + # Fallback: create basic span if no output items + span_output = _convert_to_galileo_message(completion, "assistant") + self.logger.add_llm_span( + input=span_input, + output=span_output, + tools=self.input_data.tools, + name=self.input_data.name, + model=model, + temperature=self.input_data.temperature, + duration_ns=duration_ns, + num_input_tokens=usage.get("prompt_tokens", 0), + num_output_tokens=usage.get("completion_tokens", 0), + total_tokens=usage.get("total_tokens", 0), + metadata={str(k): str(v) for k, v in self.input_data.model_parameters.items()}, + status_code=self.status_code, + ) + else: + # For non-Responses API (chat or completion), create the main span as before + span_output = _convert_to_galileo_message(completion, "assistant") + + # Add a span to the current trace or span (if this is a nested trace) + self.logger.add_llm_span( + input=span_input, + output=span_output, + tools=self.input_data.tools, + name=self.input_data.name, + model=model, + temperature=self.input_data.temperature, + duration_ns=duration_ns, + num_input_tokens=usage.get("prompt_tokens", 0), + num_output_tokens=usage.get("completion_tokens", 0), + total_tokens=usage.get("total_tokens", 0), + metadata={str(k): str(v) for k, v in self.input_data.model_parameters.items()}, + status_code=self.status_code, + ) # Conclude the trace if this is the top-level call if self.should_complete_trace: - self.logger.conclude(output=completion, duration_ns=duration_ns, status_code=self.status_code) + if self.resource.type == "response": + # For Responses API, use the final conversation context from processing + full_conversation = final_conversation_context + else: + # For other APIs, add the final span output + full_conversation = [] + if isinstance(self.input_data.input, list): + full_conversation.extend([_convert_to_galileo_message(msg) for msg in self.input_data.input]) + else: + full_conversation.append(_convert_to_galileo_message(self.input_data.input)) + full_conversation.append(_convert_to_galileo_message(completion, "assistant")) + + # Serialize with "messages" wrapper for UI compatibility + trace_output = {"messages": [msg.model_dump(exclude_none=True) for msg in full_conversation]} + self.logger.conclude( + output=serialize_to_str(trace_output), duration_ns=duration_ns, status_code=self.status_code + ) class OpenAIGalileo: diff --git a/tests/conftest.py b/tests/conftest.py index cf8f34ee..23e2fe14 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,14 @@ from openai.types import CompletionUsage from openai.types.chat import ChatCompletionMessage from openai.types.chat.chat_completion import ChatCompletion, Choice +from openai.types.responses import ( + Response, + ResponseFunctionToolCall, + ResponseOutputMessage, + ResponseOutputText, + ResponseUsage, +) +from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails from galileo.config import GalileoPythonConfig from galileo.resources.models import DatasetContent, DatasetRow, DatasetRowValuesDict @@ -81,6 +89,73 @@ def create_chat_completion() -> ChatCompletion: ) +@pytest.fixture +def create_responses_response(): + """Mock Responses API response for basic text generation.""" + + return Response( + id="resp_test123", + created_at=1758822441.0, + model="gpt-4o", + object="response", + output=[ + ResponseOutputMessage( + id="msg_test123", + content=[ + ResponseOutputText(text="This is a test response", type="output_text", annotations=[], logprobs=[]) + ], + role="assistant", + status="completed", + type="message", + ) + ], + parallel_tool_calls=True, + tool_choice="auto", + tools=[], + usage=ResponseUsage( + input_tokens=10, + input_tokens_details=InputTokensDetails(cached_tokens=0), + output_tokens=5, + output_tokens_details=OutputTokensDetails(reasoning_tokens=0), + total_tokens=15, + ), + status="completed", + ) + + +@pytest.fixture +def create_responses_response_with_tools(): + """Mock Responses API response with tool calls.""" + + return Response( + id="resp_test456", + created_at=1758822441.0, + model="gpt-4o", + object="response", + output=[ + ResponseFunctionToolCall( + id="fc_test456", + name="get_weather", + arguments='{"location": "San Francisco"}', + type="function_call", + call_id="call_test456", + status="completed", + ) + ], + parallel_tool_calls=True, + tool_choice="auto", + tools=[], + usage=ResponseUsage( + input_tokens=20, + input_tokens_details=InputTokensDetails(cached_tokens=0), + output_tokens=10, + output_tokens_details=OutputTokensDetails(reasoning_tokens=0), + total_tokens=30, + ), + status="completed", + ) + + @pytest.fixture def test_dataset_row_id() -> None: str(uuid4()) diff --git a/tests/test_openai.py b/tests/test_openai.py index 620c73a0..3d14d790 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -5,12 +5,13 @@ from httpx import Request, Response from openai import Stream from openai.types.chat import ChatCompletionChunk +from openai.types.responses import ResponseCompletedEvent from galileo import Message, MessageRole, galileo_context, log from galileo.openai import OpenAIGalileo, openai from galileo_core.schemas.logging.span import LlmSpan, WorkflowSpan from tests.testutils.setup import setup_mock_logstreams_client, setup_mock_projects_client, setup_mock_traces_client -from tests.testutils.streaming import EventStream +from tests.testutils.streaming import EventStream, ResponsesEventStream def openai_incorrect_api_key_error() -> bytes: @@ -62,7 +63,11 @@ def test_basic_openai_call( assert len(payload.traces[0].spans) == 1 assert isinstance(payload.traces[0].spans[0], LlmSpan) assert payload.traces[0].spans[0].status_code == 200 - assert payload.traces[0].input == '[{"role": "user", "content": "Say this is a test"}]' + assert payload.traces[0].input == '{"messages": [{"content": "Say this is a test", "role": "user"}]}' + assert ( + payload.traces[0].output + == '{"messages": [{"content": "Say this is a test", "role": "user"}, {"content": "The mock is working! ;)", "role": "assistant"}]}' + ) assert payload.traces[0].spans[0].input == [Message(content="Say this is a test", role=MessageRole.user)] assert payload.traces[0].spans[0].output == Message(content="The mock is working! ;)", role=MessageRole.assistant) assert payload.traces[0].spans[0].tools == [ @@ -115,8 +120,11 @@ def test_streamed_openai_call( assert len(payload.traces[0].spans) == 1 assert isinstance(payload.traces[0].spans[0], LlmSpan) - assert payload.traces[0].input == '[{"role": "user", "content": "Say this is a test"}]' - assert payload.traces[0].output == "Hello" + assert payload.traces[0].input == '{"messages": [{"content": "Say this is a test", "role": "user"}]}' + assert ( + payload.traces[0].output + == '{"messages": [{"content": "Say this is a test", "role": "user"}, {"content": "Hello", "role": "assistant"}]}' + ) assert payload.traces[0].spans[0].status_code == 200 assert payload.traces[0].spans[0].input == [Message(content="Say this is a test", role=MessageRole.user)] assert payload.traces[0].spans[0].output == Message(content="Hello", role=MessageRole.assistant) @@ -243,7 +251,10 @@ def call_openai(model: str = "gpt-3.5-turbo"): payload = mock_traces_client_instance.ingest_traces.call_args[0][0] assert len(payload.traces) == 1 assert payload.traces[0].status_code == 401 - assert payload.traces[0].output == "" + assert ( + payload.traces[0].output + == '{"messages": [{"content": "Say this is a test", "role": "user"}, {"content": "", "role": "assistant"}]}' + ) @patch( @@ -359,3 +370,279 @@ def test_openai_calls_in_active_trace( assert len(payload.traces[0].spans) == 2 assert payload.traces[0].spans[0].type == "llm" assert payload.traces[0].spans[1].type == "llm" + + +@patch("openai.resources.chat.Completions.create") +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.Traces") +def test_chat_completions_multiple_messages( + mock_traces_client: Mock, + mock_projects_client: Mock, + mock_logstreams_client: Mock, + openai_create, + create_chat_completion, +) -> None: + """Test Chat Completions API with multiple messages in conversation.""" + mock_traces_client_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + openai_create.return_value = create_chat_completion + + galileo_context.reset() + OpenAIGalileo().register_tracing() + + input_messages = [ + {"role": "user", "content": "What's the weather like today?"}, + { + "role": "assistant", + "content": "I'd be happy to help you with the weather! However, I don't have access to real-time weather data.", + }, + {"role": "user", "content": "Can you check the weather for New York?"}, + ] + + response = openai.chat.completions.create(messages=input_messages, model="gpt-4o") + + response_text = response.choices[0].message.content + assert response_text == "The mock is working! ;)" + + galileo_context.flush() + payload = mock_traces_client_instance.ingest_traces.call_args[0][0] + + assert len(payload.traces) == 1 + assert payload.traces[0].status_code == 200 + assert len(payload.traces[0].spans) == 1 + assert isinstance(payload.traces[0].spans[0], LlmSpan) + assert payload.traces[0].spans[0].status_code == 200 + + expected_input = '{"messages": [{"content": "What\'s the weather like today?", "role": "user"}, {"content": "I\'d be happy to help you with the weather! However, I don\'t have access to real-time weather data.", "role": "assistant"}, {"content": "Can you check the weather for New York?", "role": "user"}]}' + assert payload.traces[0].input == expected_input + + expected_output = '{"messages": [{"content": "What\'s the weather like today?", "role": "user"}, {"content": "I\'d be happy to help you with the weather! However, I don\'t have access to real-time weather data.", "role": "assistant"}, {"content": "Can you check the weather for New York?", "role": "user"}, {"content": "The mock is working! ;)", "role": "assistant"}]}' + assert payload.traces[0].output == expected_output + + expected_span_input = [ + Message(content="What's the weather like today?", role=MessageRole.user), + Message( + content="I'd be happy to help you with the weather! However, I don't have access to real-time weather data.", + role=MessageRole.assistant, + ), + Message(content="Can you check the weather for New York?", role=MessageRole.user), + ] + assert payload.traces[0].spans[0].input == expected_span_input + assert payload.traces[0].spans[0].output == Message(content="The mock is working! ;)", role=MessageRole.assistant) + + +@patch("openai.resources.responses.Responses.create") +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.Traces") +def test_basic_responses_api_call( + mock_traces_client: Mock, + mock_projects_client: Mock, + mock_logstreams_client: Mock, + openai_create, + create_responses_response, +) -> None: + mock_traces_client_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + openai_create.return_value = create_responses_response + + galileo_context.reset() + OpenAIGalileo().register_tracing() + + response = openai.responses.create(input="Say this is a test", model="gpt-4o") + + response_text = response.output_text + assert response_text == "This is a test response" + + galileo_context.flush() + payload = mock_traces_client_instance.ingest_traces.call_args[0][0] + + assert len(payload.traces) == 1 + assert payload.traces[0].status_code == 200 + assert len(payload.traces[0].spans) == 1 + assert isinstance(payload.traces[0].spans[0], LlmSpan) + assert payload.traces[0].spans[0].status_code == 200 + assert payload.traces[0].input == '{"messages": [{"content": "Say this is a test", "role": "user"}]}' + assert ( + payload.traces[0].output + == '{"messages": [{"content": "Say this is a test", "role": "user"}, {"content": "This is a test response", "role": "assistant"}]}' + ) + assert payload.traces[0].spans[0].input == [Message(content="Say this is a test", role=MessageRole.user)] + assert payload.traces[0].spans[0].output == Message(content="This is a test response", role=MessageRole.assistant) + + +@patch("openai.resources.responses.Responses.create") +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.Traces") +def test_responses_api_with_tools( + mock_traces_client: Mock, + mock_projects_client: Mock, + mock_logstreams_client: Mock, + openai_create, + create_responses_response_with_tools, +) -> None: + mock_traces_client_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + openai_create.return_value = create_responses_response_with_tools + + galileo_context.reset() + OpenAIGalileo().register_tracing() + + openai.responses.create( + input="What's the weather like?", + model="gpt-4o", + tools=[ + { + "type": "function", + "name": "get_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": {"location": {"type": "string"}}, + "required": ["location"], + }, + } + ], + ) + + galileo_context.flush() + payload = mock_traces_client_instance.ingest_traces.call_args[0][0] + + assert len(payload.traces) == 1 + assert payload.traces[0].status_code == 200 + assert len(payload.traces[0].spans) == 1 + assert isinstance(payload.traces[0].spans[0], LlmSpan) + assert payload.traces[0].spans[0].tools == [ + { + "type": "function", + "name": "get_weather", + "description": "Get the current weather", + "parameters": {"type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"]}, + } + ] + + assert payload.traces[0].spans[0].output.tool_calls is not None + assert len(payload.traces[0].spans[0].output.tool_calls) == 1 + assert payload.traces[0].spans[0].output.tool_calls[0].function.name == "get_weather" + assert payload.traces[0].spans[0].output.tool_calls[0].function.arguments == '{"location": "San Francisco"}' + assert payload.traces[0].spans[0].output.tool_calls[0].id == "fc_test456" + + +@patch("openai.resources.responses.Responses.create") +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.Traces") +def test_responses_api_multiple_messages( + mock_traces_client: Mock, + mock_projects_client: Mock, + mock_logstreams_client: Mock, + openai_create, + create_responses_response, +) -> None: + """Test Responses API with multiple messages in conversation.""" + mock_traces_client_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + openai_create.return_value = create_responses_response + + galileo_context.reset() + OpenAIGalileo().register_tracing() + + input_messages = [ + {"role": "user", "content": "What's the weather like today?"}, + { + "role": "assistant", + "content": "I'd be happy to help you with the weather! However, I don't have access to real-time weather data.", + }, + {"role": "user", "content": "Can you check the weather for New York?"}, + ] + + response = openai.responses.create(input=input_messages, model="gpt-4o") + + assert response.output is not None + assert len(response.output) == 1 + assert response.output[0].role == "assistant" + + galileo_context.flush() + payload = mock_traces_client_instance.ingest_traces.call_args[0][0] + + assert len(payload.traces) == 1 + assert payload.traces[0].status_code == 200 + assert len(payload.traces[0].spans) == 1 + assert isinstance(payload.traces[0].spans[0], LlmSpan) + assert payload.traces[0].spans[0].status_code == 200 + + expected_input = '{"messages": [{"content": "What\'s the weather like today?", "role": "user"}, {"content": "I\'d be happy to help you with the weather! However, I don\'t have access to real-time weather data.", "role": "assistant"}, {"content": "Can you check the weather for New York?", "role": "user"}]}' + assert payload.traces[0].input == expected_input + + expected_output = '{"messages": [{"content": "What\'s the weather like today?", "role": "user"}, {"content": "I\'d be happy to help you with the weather! However, I don\'t have access to real-time weather data.", "role": "assistant"}, {"content": "Can you check the weather for New York?", "role": "user"}, {"content": "This is a test response", "role": "assistant"}]}' + assert payload.traces[0].output == expected_output + + expected_span_input = [ + Message(content="What's the weather like today?", role=MessageRole.user), + Message( + content="I'd be happy to help you with the weather! However, I don't have access to real-time weather data.", + role=MessageRole.assistant, + ), + Message(content="Can you check the weather for New York?", role=MessageRole.user), + ] + assert payload.traces[0].spans[0].input == expected_span_input + assert payload.traces[0].spans[0].output == Message(content="This is a test response", role=MessageRole.assistant) + + +@patch("openai.resources.responses.Responses.create") +@patch("galileo.logger.logger.LogStreams") +@patch("galileo.logger.logger.Projects") +@patch("galileo.logger.logger.Traces") +def test_responses_api_streaming( + mock_traces_client: Mock, mock_projects_client: Mock, mock_logstreams_client: Mock, openai_create +) -> None: + """Test Responses API with streaming events.""" + mock_traces_client_instance = setup_mock_traces_client(mock_traces_client) + setup_mock_projects_client(mock_projects_client) + setup_mock_logstreams_client(mock_logstreams_client) + + openai_create.return_value = Stream( + cast_to=ResponseCompletedEvent, + client=openai.OpenAI(), + response=Response(status_code=200, content=ResponsesEventStream()), + ) + + galileo_context.reset() + OpenAIGalileo().register_tracing() + + stream = openai.responses.create(input="Say hello", model="gpt-4o", stream=True) + + event_count = 0 + completed_event = None + for event in stream: + event_count += 1 + if hasattr(event, "type") and event.type == "response.completed": + completed_event = event + + assert event_count >= 1 + assert completed_event is not None + assert completed_event.response.status == "completed" + + galileo_context.flush() + payload = mock_traces_client_instance.ingest_traces.call_args[0][0] + + assert len(payload.traces) == 1 + assert payload.traces[0].status_code == 200 + assert len(payload.traces[0].spans) == 1 + assert isinstance(payload.traces[0].spans[0], LlmSpan) + assert payload.traces[0].spans[0].status_code == 200 + + assert payload.traces[0].input == '{"messages": [{"content": "Say hello", "role": "user"}]}' + assert ( + payload.traces[0].output + == '{"messages": [{"content": "Say hello", "role": "user"}, {"content": "This is a test response", "role": "assistant"}]}' + ) + + assert payload.traces[0].spans[0].input == [Message(content="Say hello", role=MessageRole.user)] + assert payload.traces[0].spans[0].output == Message(content="This is a test response", role=MessageRole.assistant) diff --git a/tests/testutils/streaming.py b/tests/testutils/streaming.py index cef9f4a4..8cbd8959 100644 --- a/tests/testutils/streaming.py +++ b/tests/testutils/streaming.py @@ -4,6 +4,16 @@ from openai import BaseModel from openai.types.chat import ChatCompletionChunk +from openai.types.responses import ( + Response, + ResponseCompletedEvent, + ResponseCreatedEvent, + ResponseInProgressEvent, + ResponseOutputMessage, + ResponseOutputText, + ResponseUsage, +) +from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails def model_dict(m: BaseModel, **kwargs: Any) -> dict[str, Any]: @@ -69,3 +79,61 @@ def __iter__(self) -> Generator: yield b"event: done\n" yield b"data: [DONE]\n\n" + + +class ResponsesEventStream: + @staticmethod + def _dump_event(event) -> tuple[Optional[bytes], Optional[bytes]]: + """Format Responses API events for HTTP streaming.""" + encoded_data = f"data: {json.dumps(model_dict(event))}\n\n".encode() + return None, encoded_data + + def generate(self) -> Generator[ResponseCompletedEvent, None, None]: + """Generate Responses API streaming events using proper event objects.""" + # Create a mock response object with all required fields (copied from conftest.py fixture) + mock_response = Response( + id="resp_test123", + created_at=1758822441.0, + model="gpt-4o", + object="response", + output=[ + ResponseOutputMessage( + id="msg_test123", + content=[ + ResponseOutputText( + text="This is a test response", type="output_text", annotations=[], logprobs=[] + ) + ], + role="assistant", + status="completed", + type="message", + ) + ], + parallel_tool_calls=True, + tool_choice="auto", + tools=[], + usage=ResponseUsage( + input_tokens=10, + input_tokens_details=InputTokensDetails(cached_tokens=0), + output_tokens=5, + output_tokens_details=OutputTokensDetails(reasoning_tokens=0), + total_tokens=15, + ), + status="completed", + ) + + # Yield the streaming events using proper event classes + yield ResponseCreatedEvent(response=mock_response, sequence_number=0, type="response.created") + yield ResponseInProgressEvent(response=mock_response, sequence_number=1, type="response.in_progress") + yield ResponseCompletedEvent(response=mock_response, sequence_number=2, type="response.completed") + + def __iter__(self) -> Generator[bytes, None, None]: + """Format Responses API events for HTTP streaming.""" + for _event in self.generate(): + t, d = self._dump_event(_event) + if t: + yield t + if d: + yield d + + yield b"data: [DONE]\n\n"