Skip to content

Commit 6559753

Browse files
authored
fix instrumentation (#1123)
fix
1 parent 7b50c9c commit 6559753

File tree

1 file changed

+78
-164
lines changed

1 file changed

+78
-164
lines changed

agentops/instrumentation/__init__.py

Lines changed: 78 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -75,24 +75,6 @@ class InstrumentorConfig(TypedDict):
7575
},
7676
}
7777

78-
# Configuration for utility instrumentors
79-
UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = {
80-
"concurrent.futures": {
81-
"module_name": "agentops.instrumentation.utilities.concurrent_futures",
82-
"class_name": "ConcurrentFuturesInstrumentor",
83-
"min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib)
84-
"package_name": "python", # Special case for stdlib modules
85-
},
86-
}
87-
88-
# Define which packages require which utility instrumentors
89-
# This maps package names to the list of utility instrumentors they depend on
90-
UTILITY_DEPENDENCIES: dict[str, list[str]] = {
91-
"mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing
92-
# Add more dependencies as needed in the future
93-
# "langchain": ["concurrent.futures", "asyncio"],
94-
}
95-
9678
# Configuration for supported agentic libraries
9779
AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = {
9880
"crewai": {
@@ -133,7 +115,7 @@ class InstrumentorConfig(TypedDict):
133115
}
134116

135117
# Combine all target packages for monitoring
136-
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys())
118+
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys())
137119

138120
# Create a single instance of the manager
139121
# _manager = InstrumentationManager() # Removed
@@ -143,7 +125,6 @@ class InstrumentorConfig(TypedDict):
143125
_original_builtins_import = builtins.__import__ # Store original import
144126
_instrumenting_packages: Set[str] = set()
145127
_has_agentic_library: bool = False
146-
_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation
147128

148129

149130
# New helper function to check module origin
@@ -153,16 +134,6 @@ def _is_installed_package(module_obj: ModuleType, package_name_key: str) -> bool
153134
rather than a local module, especially when names might collide.
154135
`package_name_key` is the key from TARGET_PACKAGES (e.g., 'agents', 'google.adk').
155136
"""
156-
# Special case for stdlib modules (marked with package_name="python" in UTILITY_INSTRUMENTORS)
157-
if (
158-
package_name_key in UTILITY_INSTRUMENTORS
159-
and UTILITY_INSTRUMENTORS[package_name_key].get("package_name") == "python"
160-
):
161-
logger.debug(
162-
f"_is_installed_package: Module '{package_name_key}' is a Python standard library module. Considering it an installed package."
163-
)
164-
return True
165-
166137
if not hasattr(module_obj, "__file__") or not module_obj.__file__:
167138
logger.debug(
168139
f"_is_installed_package: Module '{package_name_key}' has no __file__, assuming it might be an SDK namespace package. Returning True."
@@ -255,7 +226,7 @@ def _uninstrument_providers():
255226
def _should_instrument_package(package_name: str) -> bool:
256227
"""
257228
Determine if a package should be instrumented based on current state.
258-
Handles special cases for agentic libraries, providers, and utility instrumentors.
229+
Handles special cases for agentic libraries and providers.
259230
"""
260231
global _has_agentic_library
261232

@@ -264,22 +235,6 @@ def _should_instrument_package(package_name: str) -> bool:
264235
logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.")
265236
return False
266237

267-
# Utility instrumentors should only be instrumented when their dependent packages are active
268-
if package_name in UTILITY_INSTRUMENTORS:
269-
# Check if any package that depends on this utility is instrumented
270-
for dependent_package, utilities in UTILITY_DEPENDENCIES.items():
271-
if package_name in utilities and _is_package_instrumented(dependent_package):
272-
logger.debug(
273-
f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing."
274-
)
275-
return True
276-
277-
logger.debug(
278-
f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping."
279-
)
280-
return False
281-
282-
# Only apply agentic/provider logic if it's NOT a utility instrumentor
283238
is_target_agentic = package_name in AGENTIC_LIBRARIES
284239
is_target_provider = package_name in PROVIDERS
285240

@@ -321,136 +276,88 @@ def _should_instrument_package(package_name: str) -> bool:
321276
return False
322277

323278

324-
def _instrument_utility_dependencies(package_name: str):
325-
"""
326-
Instrument any utility dependencies required by the given package.
327-
328-
Args:
329-
package_name: The package that was just instrumented
330-
"""
331-
if package_name in UTILITY_DEPENDENCIES:
332-
utilities_needed = UTILITY_DEPENDENCIES[package_name]
333-
logger.debug(
334-
f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}"
335-
)
336-
337-
for utility_name in utilities_needed:
338-
if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name):
339-
logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'")
340-
341-
# Check if the utility module is available
342-
if utility_name in sys.modules:
343-
_perform_instrumentation(utility_name)
344-
else:
345-
logger.debug(
346-
f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported"
347-
)
348-
349-
350279
def _perform_instrumentation(package_name: str):
351280
"""Helper function to perform instrumentation for a given package."""
352281
global _instrumenting_packages, _active_instrumentors, _has_agentic_library
353-
354-
# Check if we're already instrumenting this package (prevent circular instrumentation)
355-
if package_name in _instrumenting_packages:
356-
logger.debug(
357-
f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation"
358-
)
359-
return
360-
361282
if not _should_instrument_package(package_name):
362283
return
363284

364285
# Get the appropriate configuration for the package
365-
# Ensure package_name is a key in either PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS
366-
if (
367-
package_name not in PROVIDERS
368-
and package_name not in AGENTIC_LIBRARIES
369-
and package_name not in UTILITY_INSTRUMENTORS
370-
):
286+
# Ensure package_name is a key in either PROVIDERS or AGENTIC_LIBRARIES
287+
if package_name not in PROVIDERS and package_name not in AGENTIC_LIBRARIES:
371288
logger.debug(
372-
f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS. Skipping."
289+
f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS or AGENTIC_LIBRARIES. Skipping."
373290
)
374291
return
375292

376-
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name]
293+
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name)
377294
loader = InstrumentorLoader(**config)
378295

379-
# Add to _instrumenting_packages to prevent circular instrumentation
380-
_instrumenting_packages.add(package_name)
381-
382-
try:
383-
# instrument_one already checks loader.should_activate
384-
instrumentor_instance = instrument_one(loader)
385-
if instrumentor_instance is not None:
386-
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
387-
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
388-
# For now, assuming instrument_one returns instance only on full success.
389-
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.
390-
391-
# Let's assume instrument_one might return an instance whose .instrument() failed.
392-
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
393-
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.
394-
395-
# Store the package key this instrumentor is for, to aid _is_package_instrumented
396-
instrumentor_instance._agentops_instrumented_package_key = package_name
397-
398-
# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
399-
# This is a safeguard, _is_package_instrumented should catch this earlier.
400-
is_newly_added = True
401-
for existing_inst in _active_instrumentors:
402-
if (
403-
hasattr(existing_inst, "_agentops_instrumented_package_key")
404-
and existing_inst._agentops_instrumented_package_key == package_name
405-
):
406-
is_newly_added = False
407-
logger.debug(
408-
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
409-
)
410-
break
411-
if is_newly_added:
412-
_active_instrumentors.append(instrumentor_instance)
413-
414-
# If this was an agentic library AND it's newly effectively instrumented.
296+
# instrument_one already checks loader.should_activate
297+
instrumentor_instance = instrument_one(loader)
298+
if instrumentor_instance is not None:
299+
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
300+
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
301+
# For now, assuming instrument_one returns instance only on full success.
302+
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.
303+
304+
# Let's assume instrument_one might return an instance whose .instrument() failed.
305+
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
306+
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.
307+
308+
# Store the package key this instrumentor is for, to aid _is_package_instrumented
309+
instrumentor_instance._agentops_instrumented_package_key = package_name
310+
311+
# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
312+
# This is a safeguard, _is_package_instrumented should catch this earlier.
313+
is_newly_added = True
314+
for existing_inst in _active_instrumentors:
415315
if (
416-
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
417-
): # Check _has_agentic_library to ensure this is the *first* one.
418-
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
419-
_has_agentic_library = True
420-
421-
# Mark package for utility dependency instrumentation
422-
# We defer this to avoid circular imports during package initialization
423-
if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities
424-
if package_name in UTILITY_DEPENDENCIES:
425-
_pending_utility_instrumentation.add(package_name)
426-
logger.debug(
427-
f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation"
428-
)
429-
else:
430-
logger.debug(
431-
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
432-
)
433-
finally:
434-
# Always remove from _instrumenting_packages when done
435-
_instrumenting_packages.discard(package_name)
436-
316+
hasattr(existing_inst, "_agentops_instrumented_package_key")
317+
and existing_inst._agentops_instrumented_package_key == package_name
318+
):
319+
is_newly_added = False
320+
logger.debug(
321+
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
322+
)
323+
break
324+
if is_newly_added:
325+
_active_instrumentors.append(instrumentor_instance)
437326

438-
def _process_pending_utility_instrumentation():
439-
"""Process any pending utility instrumentations."""
440-
global _pending_utility_instrumentation
327+
# If this was an agentic library AND it's newly effectively instrumented.
328+
if (
329+
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
330+
): # Check _has_agentic_library to ensure this is the *first* one.
331+
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
332+
_has_agentic_library = True
441333

442-
if not _pending_utility_instrumentation:
443-
return
334+
# Special case: If mem0 is instrumented, also instrument concurrent.futures
335+
if package_name == "mem0" and is_newly_added:
336+
try:
337+
# Check if concurrent.futures module is available
338+
339+
# Create config for concurrent.futures instrumentor
340+
concurrent_config = InstrumentorConfig(
341+
module_name="agentops.instrumentation.utilities.concurrent_futures",
342+
class_name="ConcurrentFuturesInstrumentor",
343+
min_version="3.7.0", # Python 3.7+ (concurrent.futures is stdlib)
344+
package_name="python", # Special case for stdlib modules
345+
)
444346

445-
# Copy and clear to avoid modifying during iteration
446-
pending = _pending_utility_instrumentation.copy()
447-
_pending_utility_instrumentation.clear()
347+
# Create and instrument concurrent.futures
348+
concurrent_loader = InstrumentorLoader(**concurrent_config)
349+
concurrent_instrumentor = instrument_one(concurrent_loader)
448350

449-
for package_name in pending:
450-
try:
451-
_instrument_utility_dependencies(package_name)
452-
except Exception as e:
453-
logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}")
351+
if concurrent_instrumentor is not None:
352+
concurrent_instrumentor._agentops_instrumented_package_key = "concurrent.futures"
353+
_active_instrumentors.append(concurrent_instrumentor)
354+
logger.info("AgentOps: Instrumented concurrent.futures as a dependency of mem0.")
355+
except Exception as e:
356+
logger.debug(f"Could not instrument concurrent.futures for mem0: {e}")
357+
else:
358+
logger.debug(
359+
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
360+
)
454361

455362

456363
def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0):
@@ -460,9 +367,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
460367
"""
461368
global _instrumenting_packages, _has_agentic_library
462369

463-
# Process any pending utility instrumentations before handling new imports
464-
_process_pending_utility_instrumentation()
465-
466370
# If an agentic library is already instrumented, skip all further instrumentation
467371
if _has_agentic_library:
468372
return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)
@@ -503,7 +407,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
503407

504408
# Instrument all matching packages
505409
for package_to_check in packages_to_check:
506-
if not _is_package_instrumented(package_to_check):
410+
if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check):
507411
target_module_obj = sys.modules.get(package_to_check)
508412

509413
if target_module_obj:
@@ -518,13 +422,16 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
518422
f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt."
519423
)
520424

425+
_instrumenting_packages.add(package_to_check)
521426
try:
522427
_perform_instrumentation(package_to_check)
523428
# If we just instrumented an agentic library, stop
524429
if _has_agentic_library:
525430
break
526431
except Exception as e:
527432
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
433+
finally:
434+
_instrumenting_packages.discard(package_to_check)
528435

529436
return module
530437

@@ -632,7 +539,11 @@ def instrument_all():
632539
package_to_check = target
633540
break
634541

635-
if package_to_check and not _is_package_instrumented(package_to_check):
542+
if (
543+
package_to_check
544+
and package_to_check not in _instrumenting_packages
545+
and not _is_package_instrumented(package_to_check)
546+
):
636547
target_module_obj = sys.modules.get(package_to_check)
637548

638549
if target_module_obj:
@@ -644,10 +555,13 @@ def instrument_all():
644555
f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously."
645556
)
646557

558+
_instrumenting_packages.add(package_to_check)
647559
try:
648560
_perform_instrumentation(package_to_check)
649561
except Exception as e:
650562
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
563+
finally:
564+
_instrumenting_packages.discard(package_to_check)
651565

652566

653567
def uninstrument_all():

0 commit comments

Comments
 (0)