Skip to content

Commit e3df07b

Browse files
authored
refactor: Simplify method wrapping in AgnoInstrumentor to avoid circu… (#1092)
refactor: Simplify method wrapping in AgnoInstrumentor to avoid circular imports
1 parent 38aa72c commit e3df07b

File tree

1 file changed

+116
-129
lines changed

1 file changed

+116
-129
lines changed

agentops/instrumentation/agentic/agno/instrumentor.py

Lines changed: 116 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -86,37 +86,8 @@ def clear_all(self) -> None:
8686

8787

8888
# Methods to wrap for instrumentation
89-
WRAPPED_METHODS: List[WrapConfig] = [
90-
# Workflow session methods
91-
WrapConfig(
92-
trace_name="agno.workflow.session.load_session",
93-
package="agno.workflow.workflow",
94-
class_name="Workflow",
95-
method_name="load_session",
96-
handler=get_workflow_session_attributes,
97-
),
98-
WrapConfig(
99-
trace_name="agno.workflow.session.new_session",
100-
package="agno.workflow.workflow",
101-
class_name="Workflow",
102-
method_name="new_session",
103-
handler=get_workflow_session_attributes,
104-
),
105-
WrapConfig(
106-
trace_name="agno.workflow.session.read_from_storage",
107-
package="agno.workflow.workflow",
108-
class_name="Workflow",
109-
method_name="read_from_storage",
110-
handler=get_workflow_session_attributes,
111-
),
112-
WrapConfig(
113-
trace_name="agno.workflow.session.write_to_storage",
114-
package="agno.workflow.workflow",
115-
class_name="Workflow",
116-
method_name="write_to_storage",
117-
handler=get_workflow_session_attributes,
118-
),
119-
]
89+
# Empty list - all wrapping will be done in _custom_wrap to avoid circular imports
90+
WRAPPED_METHODS: List[WrapConfig] = []
12091

12192

12293
class StreamingResultWrapper:
@@ -917,113 +888,23 @@ class AgnoInstrumentor(CommonInstrumentor):
917888

918889
def __init__(self):
919890
"""Initialize the Agno instrumentor."""
920-
# Create instrumentor config
891+
self._streaming_context_manager = StreamingContextManager()
892+
893+
# Create instrumentor config with populated wrapped methods
921894
config = InstrumentorConfig(
922895
library_name="agentops.instrumentation.agno",
923896
library_version="0.1.0",
924-
wrapped_methods=[], # We'll populate this in _get_wrapped_methods
897+
wrapped_methods=self._get_initial_wrapped_methods(),
925898
metrics_enabled=True,
926899
dependencies=["agno >= 0.1.0"],
927900
)
928901

929902
super().__init__(config)
930-
self._streaming_context_manager = StreamingContextManager()
931-
932-
def _get_wrapped_methods(self) -> List[WrapConfig]:
933-
"""Return list of methods to be wrapped."""
934-
# Combine standard wrapped methods with custom streaming wraps
935-
wrapped_methods = WRAPPED_METHODS.copy()
936-
937-
# Add streaming method configurations
938-
wrapped_methods.extend(
939-
[
940-
# Streaming agent methods
941-
WrapConfig(
942-
trace_name="agno.agent.run.agent",
943-
package="agno.agent",
944-
class_name="Agent",
945-
method_name="run",
946-
handler=self._create_streaming_agent_wrapper,
947-
),
948-
WrapConfig(
949-
trace_name="agno.agent.run.agent",
950-
package="agno.agent",
951-
class_name="Agent",
952-
method_name="arun",
953-
handler=self._create_streaming_agent_async_wrapper,
954-
),
955-
# Streaming workflow methods
956-
WrapConfig(
957-
trace_name="agno.workflow.run.workflow",
958-
package="agno.workflow.workflow",
959-
class_name="Workflow",
960-
method_name="run_workflow",
961-
handler=self._create_streaming_workflow_wrapper,
962-
),
963-
WrapConfig(
964-
trace_name="agno.workflow.run.workflow",
965-
package="agno.workflow.workflow",
966-
class_name="Workflow",
967-
method_name="arun_workflow",
968-
handler=self._create_streaming_workflow_async_wrapper,
969-
),
970-
# Streaming tool execution
971-
WrapConfig(
972-
trace_name="agno.tool.execute.tool_usage",
973-
package="agno.tools.function",
974-
class_name="FunctionCall",
975-
method_name="execute",
976-
handler=self._create_streaming_tool_wrapper,
977-
),
978-
# Metrics wrapper
979-
WrapConfig(
980-
trace_name="agno.agent.metrics",
981-
package="agno.agent",
982-
class_name="Agent",
983-
method_name="_set_session_metrics",
984-
handler=self._create_metrics_wrapper,
985-
),
986-
# Team methods
987-
WrapConfig(
988-
trace_name="agno.team.run.agent",
989-
package="agno.team.team",
990-
class_name="Team",
991-
method_name="run",
992-
handler=self._create_team_wrapper,
993-
),
994-
WrapConfig(
995-
trace_name="agno.team.run.agent",
996-
package="agno.team.team",
997-
class_name="Team",
998-
method_name="arun",
999-
handler=self._create_team_async_wrapper,
1000-
),
1001-
WrapConfig(
1002-
trace_name="agno.team.run.agent",
1003-
package="agno.team.team",
1004-
class_name="Team",
1005-
method_name="print_response",
1006-
handler=self._create_team_wrapper,
1007-
),
1008-
# Team internal methods with special handling
1009-
WrapConfig(
1010-
trace_name="agno.team.run.workflow",
1011-
package="agno.team.team",
1012-
class_name="Team",
1013-
method_name="_run",
1014-
handler=self._create_team_internal_wrapper,
1015-
),
1016-
WrapConfig(
1017-
trace_name="agno.team.run.workflow",
1018-
package="agno.team.team",
1019-
class_name="Team",
1020-
method_name="_arun",
1021-
handler=self._create_team_internal_async_wrapper,
1022-
),
1023-
]
1024-
)
1025903

1026-
return wrapped_methods
904+
def _get_initial_wrapped_methods(self) -> List[WrapConfig]:
905+
"""Return list of methods to be wrapped during initialization."""
906+
# Only return the standard wrapped methods that don't need custom wrappers
907+
return WRAPPED_METHODS.copy()
1027908

1028909
def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
1029910
"""Create metrics for the instrumentor.
@@ -1036,6 +917,112 @@ def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
1036917
def _initialize(self, **kwargs):
1037918
"""Perform custom initialization."""
1038919
logger.info("Agno instrumentation installed successfully")
920+
# Schedule wrapping to happen after imports are complete
921+
import threading
922+
923+
threading.Timer(0.1, self._delayed_wrap).start()
924+
925+
def _delayed_wrap(self):
926+
"""Perform wrapping after a delay to avoid circular imports."""
927+
try:
928+
self._perform_wrapping()
929+
except Exception as e:
930+
logger.error(f"Failed to perform delayed wrapping: {e}")
931+
932+
def _custom_wrap(self, **kwargs):
933+
"""Skip custom wrapping during initialization - it will be done in _delayed_wrap."""
934+
pass
935+
936+
def _perform_wrapping(self):
937+
"""Actually perform the wrapping - called after imports are complete."""
938+
if not self._tracer:
939+
logger.debug("No tracer available for Agno wrapping")
940+
return
941+
942+
from agentops.instrumentation.common.wrappers import wrap_function_wrapper, WrapConfig, wrap
943+
944+
# Import Agno modules now that they should be fully loaded
945+
try:
946+
import agno.agent
947+
import agno.workflow.workflow
948+
import agno.tools.function
949+
import agno.team.team # Noqa: F401
950+
except ImportError as e:
951+
logger.error(f"Failed to import Agno modules for wrapping: {e}")
952+
return
953+
954+
# First wrap the standard workflow session methods using the standard wrapper
955+
session_methods = [
956+
WrapConfig(
957+
trace_name="agno.workflow.session.load_session",
958+
package="agno.workflow.workflow",
959+
class_name="Workflow",
960+
method_name="load_session",
961+
handler=get_workflow_session_attributes,
962+
),
963+
WrapConfig(
964+
trace_name="agno.workflow.session.new_session",
965+
package="agno.workflow.workflow",
966+
class_name="Workflow",
967+
method_name="new_session",
968+
handler=get_workflow_session_attributes,
969+
),
970+
WrapConfig(
971+
trace_name="agno.workflow.session.read_from_storage",
972+
package="agno.workflow.workflow",
973+
class_name="Workflow",
974+
method_name="read_from_storage",
975+
handler=get_workflow_session_attributes,
976+
),
977+
WrapConfig(
978+
trace_name="agno.workflow.session.write_to_storage",
979+
package="agno.workflow.workflow",
980+
class_name="Workflow",
981+
method_name="write_to_storage",
982+
handler=get_workflow_session_attributes,
983+
),
984+
]
985+
986+
wrapped_count = 0
987+
for wrap_config in session_methods:
988+
try:
989+
wrap(wrap_config, self._tracer)
990+
wrapped_count += 1
991+
except Exception as e:
992+
logger.debug(f"Failed to wrap {wrap_config}: {e}")
993+
994+
# Now wrap the streaming methods that need custom wrappers
995+
streaming_methods = [
996+
# Streaming agent methods
997+
("agno.agent", "Agent.run", self._create_streaming_agent_wrapper()),
998+
("agno.agent", "Agent.arun", self._create_streaming_agent_async_wrapper()),
999+
# Streaming workflow methods
1000+
("agno.workflow.workflow", "Workflow.run_workflow", self._create_streaming_workflow_wrapper()),
1001+
("agno.workflow.workflow", "Workflow.arun_workflow", self._create_streaming_workflow_async_wrapper()),
1002+
# Streaming tool execution
1003+
("agno.tools.function", "FunctionCall.execute", self._create_streaming_tool_wrapper()),
1004+
# Metrics wrapper
1005+
("agno.agent", "Agent._set_session_metrics", self._create_metrics_wrapper()),
1006+
# Team methods
1007+
("agno.team.team", "Team.run", self._create_team_wrapper()),
1008+
("agno.team.team", "Team.arun", self._create_team_async_wrapper()),
1009+
("agno.team.team", "Team.print_response", self._create_team_wrapper()),
1010+
# Team internal methods with special handling
1011+
("agno.team.team", "Team._run", self._create_team_internal_wrapper()),
1012+
("agno.team.team", "Team._arun", self._create_team_internal_async_wrapper()),
1013+
]
1014+
1015+
for package, method, wrapper in streaming_methods:
1016+
try:
1017+
wrap_function_wrapper(package, method, wrapper)
1018+
wrapped_count += 1
1019+
except Exception as e:
1020+
logger.debug(f"Failed to wrap {package}.{method}: {e}")
1021+
1022+
if wrapped_count > 0:
1023+
logger.info(f"Agno instrumentation: Successfully wrapped {wrapped_count} methods")
1024+
else:
1025+
logger.warning("Agno instrumentation: No methods were successfully wrapped")
10391026

10401027
def _custom_unwrap(self, **kwargs):
10411028
"""Perform custom unwrapping."""

0 commit comments

Comments
 (0)