Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions ddtrace/llmobs/_integrations/openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
from ddtrace.llmobs._utils import _get_span_name
from ddtrace.llmobs._utils import load_data_value
from ddtrace.llmobs._utils import safe_json
from ddtrace.trace import Span


Expand Down Expand Up @@ -232,13 +233,13 @@ def _llmobs_set_response_attributes(self, span: Span, oai_span: OaiSpanAdapter)
if oai_span.response and oai_span.response.output:
messages, tool_call_outputs = oai_span.llmobs_output_messages()

for tool_id, tool_name, tool_args in tool_call_outputs:
for tool_call_output in tool_call_outputs:
core.dispatch(
DISPATCH_ON_LLM_TOOL_CHOICE,
(
tool_id,
tool_name,
tool_args,
tool_call_output["tool_id"],
tool_call_output["name"],
safe_json(tool_call_output["arguments"]),
{
"trace_id": format_trace_id(span.trace_id),
"span_id": str(span.span_id),
Expand Down
178 changes: 61 additions & 117 deletions ddtrace/llmobs/_integrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,35 @@ def openai_get_input_messages_from_response_input(
Returns:
- A list of processed messages
"""
processed, _ = _openai_parse_input_response_messages(messages)
return processed


def _openai_parse_input_response_messages(
messages: Optional[Union[str, List[Dict[str, Any]]]], system_instructions: Optional[str] = None
) -> Tuple[List[Dict[str, Any]], List[str]]:
"""
Parses input messages from the openai responses api into a list of processed messages
and a list of tool call IDs.

Args:
messages: A list of output messages

Returns:
- A list of processed messages
- A list of tool call IDs
"""
processed: List[Dict[str, Any]] = []
tool_call_ids: List[str] = []

if system_instructions:
processed.append({"role": "system", "content": system_instructions})

if not messages:
return processed
return processed, tool_call_ids

if isinstance(messages, str):
return [{"role": "user", "content": messages}]
return [{"role": "user", "content": messages}], tool_call_ids

for item in messages:
processed_item: Dict[str, Union[str, List[ToolCall], List[ToolResult]]] = {}
Expand All @@ -574,7 +596,7 @@ def openai_get_input_messages_from_response_input(
processed_item["role"] = item["role"]
elif "call_id" in item and ("arguments" in item or "input" in item):
# Process `ResponseFunctionToolCallParam` or ResponseCustomToolCallParam type from input messages
arguments_str = item.get("arguments", "{}") or item.get("input", "{}")
arguments_str = item.get("arguments", "") or item.get("input", OAI_HANDOFF_TOOL_ARG)
arguments = safe_load_json(arguments_str)

tool_call_info = ToolCall(
Expand All @@ -585,7 +607,7 @@ def openai_get_input_messages_from_response_input(
)
processed_item.update(
{
"role": "user",
"role": "assistant",
"tool_calls": [tool_call_info],
}
)
Expand All @@ -607,10 +629,11 @@ def openai_get_input_messages_from_response_input(
"tool_results": [tool_result_info],
}
)
tool_call_ids.append(item["call_id"])
if processed_item:
processed.append(processed_item)

return processed
return processed, tool_call_ids


def openai_get_output_messages_from_response(response: Optional[Any]) -> List[Dict[str, Any]]:
Expand All @@ -630,15 +653,33 @@ def openai_get_output_messages_from_response(response: Optional[Any]) -> List[Di
if not messages:
return []

processed_messages, _ = _openai_parse_output_response_messages(messages)

return processed_messages


def _openai_parse_output_response_messages(messages: List[Any]) -> Tuple[List[Dict[str, Any]], List[ToolCall]]:
"""
Parses output messages from the openai responses api into a list of processed messages
and a list of tool call outputs.

Args:
messages: A list of output messages

Returns:
- A list of processed messages
- A list of tool call outputs
"""
processed: List[Dict[str, Any]] = []
tool_call_outputs: List[ToolCall] = []

for item in messages:
message = {}
message_type = _get_attr(item, "type", "")

if message_type == "message":
text = ""
for content in _get_attr(item, "content", []):
for content in _get_attr(item, "content", []) or []:
text += str(_get_attr(content, "text", "") or "")
text += str(_get_attr(content, "refusal", "") or "")
message.update({"role": _get_attr(item, "role", "assistant"), "content": text})
Expand All @@ -656,26 +697,29 @@ def openai_get_output_messages_from_response(response: Optional[Any]) -> List[Di
}
)
elif message_type == "function_call" or message_type == "custom_tool_call":
arguments = _get_attr(item, "input", "") or _get_attr(item, "arguments", "{}")
arguments = safe_load_json(arguments)
call_id = _get_attr(item, "call_id", "")
name = _get_attr(item, "name", "")
raw_arguments = _get_attr(item, "input", "") or _get_attr(item, "arguments", OAI_HANDOFF_TOOL_ARG)
arguments = safe_load_json(raw_arguments)
tool_call_info = ToolCall(
tool_id=_get_attr(item, "call_id", ""),
tool_id=call_id,
arguments=arguments,
name=_get_attr(item, "name", ""),
name=name,
type=_get_attr(item, "type", "function"),
)
tool_call_outputs.append(tool_call_info)
message.update(
{
"tool_calls": [tool_call_info],
"role": "assistant",
}
)
else:
message.update({"role": "assistant", "content": "Unsupported content type: {}".format(message_type)})
message.update({"content": str(item), "role": "assistant"})

processed.append(message)

return processed
return processed, tool_call_outputs


def openai_get_metadata_from_response(
Expand Down Expand Up @@ -1071,126 +1115,26 @@ def llmobs_input_messages(self) -> Tuple[List[Dict[str, Any]], List[str]]:
- A list of processed messages
- A list of tool call IDs for span linking purposes
"""
messages = self.input
processed: List[Dict[str, Any]] = []
tool_call_ids: List[str] = []

if self.response_system_instructions:
processed.append({"role": "system", "content": self.response_system_instructions})

if not messages:
return processed, tool_call_ids

if isinstance(messages, str):
return [{"content": messages, "role": "user"}], tool_call_ids

for item in messages:
processed_item: Dict[str, Union[str, List[Dict[str, str]]]] = {}
# Handle regular message
if "content" in item and "role" in item:
processed_item_content = ""
if isinstance(item["content"], list):
for content in item["content"]:
processed_item_content += content.get("text", "")
processed_item_content += content.get("refusal", "")
else:
processed_item_content = item["content"]
if processed_item_content:
processed_item["content"] = processed_item_content
processed_item["role"] = item["role"]
elif "call_id" in item and "arguments" in item:
"""
Process `ResponseFunctionToolCallParam` type from input messages
"""
try:
arguments = json.loads(item["arguments"])
except json.JSONDecodeError:
arguments = item["arguments"]
processed_item["tool_calls"] = [
{
"tool_id": item["call_id"],
"arguments": arguments,
"name": item.get("name", ""),
"type": item.get("type", "function_call"),
}
]
elif "call_id" in item and "output" in item:
"""
Process `FunctionCallOutput` type from input messages
"""
output = item["output"]

if isinstance(output, str):
try:
output = json.loads(output)
except json.JSONDecodeError:
output = {"output": output}
tool_call_ids.append(item["call_id"])
processed_item["role"] = "tool"
processed_item["content"] = item["output"]
processed_item["tool_id"] = item["call_id"]
if processed_item:
processed.append(processed_item)
return _openai_parse_input_response_messages(self.input, self.response_system_instructions)

return processed, tool_call_ids

def llmobs_output_messages(self) -> Tuple[List[Dict[str, Any]], List[Tuple[str, str, str]]]:
def llmobs_output_messages(self) -> Tuple[List[Dict[str, Any]], List[ToolCall]]:
"""Returns processed output messages for LLM Obs LLM spans.

Returns:
- A list of processed messages
- A list of tool call data (name, id, args) for span linking purposes
- A list of tool calls for span linking purposes
"""
if not self.response or not self.response.output:
return [], []

messages: List[Any] = self.response.output
processed: List[Dict[str, Any]] = []
tool_call_outputs: List[Tuple[str, str, str]] = []
if not messages:
return processed, tool_call_outputs
return [], []

if not isinstance(messages, list):
messages = [messages]

for item in messages:
message = {}
# Handle content-based messages
if hasattr(item, "content"):
text = ""
for content in item.content:
if hasattr(content, "text") or hasattr(content, "refusal"):
text += getattr(content, "text", "")
text += getattr(content, "refusal", "")
message.update({"role": getattr(item, "role", "assistant"), "content": text})
# Handle tool calls
elif hasattr(item, "call_id") and hasattr(item, "arguments"):
tool_call_outputs.append(
(
item.call_id,
getattr(item, "name", ""),
item.arguments if item.arguments else OAI_HANDOFF_TOOL_ARG,
)
)
message.update(
{
"tool_calls": [
{
"tool_id": item.call_id,
"arguments": (
json.loads(item.arguments) if isinstance(item.arguments, str) else item.arguments
),
"name": getattr(item, "name", ""),
"type": getattr(item, "type", "function"),
}
]
}
)
else:
message.update({"content": str(item)})
processed.append(message)

return processed, tool_call_outputs
return _openai_parse_output_response_messages(messages)

def llmobs_trace_input(self) -> Optional[str]:
"""Converts Response span data to an input value for top level trace.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
LLM Observability: Fixes an issue where reasoning message types were not being handled correctly in the OpenAI Agents integration, leading to
output messages being dropped on LLM spans.
Loading
Loading