Skip to content

Commit 0c10b4e

Browse files
committed
opentelemetry-instrumentation-dbapi: instrument commit and rollback
1 parent ccea42c commit 0c10b4e

File tree

12 files changed

+389
-55
lines changed

12 files changed

+389
-55
lines changed

instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py

Lines changed: 131 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ def trace_integration(
209209
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
210210
enable_attribute_commenter: bool = False,
211211
commenter_options: dict[str, Any] | None = None,
212+
enable_transaction_spans: bool = True,
212213
):
213214
"""Integrate with DB API library.
214215
https://www.python.org/dev/peps/pep-0249/
@@ -228,6 +229,7 @@ def trace_integration(
228229
default one is used.
229230
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True.
230231
commenter_options: Configurations for tags to be appended at the sql query.
232+
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
231233
"""
232234
wrap_connect(
233235
__name__,
@@ -242,6 +244,7 @@ def trace_integration(
242244
db_api_integration_factory=db_api_integration_factory,
243245
enable_attribute_commenter=enable_attribute_commenter,
244246
commenter_options=commenter_options,
247+
enable_transaction_spans=enable_transaction_spans,
245248
)
246249

247250

@@ -258,6 +261,7 @@ def wrap_connect(
258261
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
259262
commenter_options: dict[str, Any] | None = None,
260263
enable_attribute_commenter: bool = False,
264+
enable_transaction_spans: bool = True,
261265
):
262266
"""Integrate with DB API library.
263267
https://www.python.org/dev/peps/pep-0249/
@@ -277,6 +281,7 @@ def wrap_connect(
277281
default one is used.
278282
commenter_options: Configurations for tags to be appended at the sql query.
279283
enable_attribute_commenter: Flag to enable/disable sqlcomment inclusion in `db.statement` span attribute. Only available if enable_commenter=True.
284+
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
280285
281286
"""
282287
db_api_integration_factory = (
@@ -301,6 +306,7 @@ def wrap_connect_(
301306
commenter_options=commenter_options,
302307
connect_module=connect_module,
303308
enable_attribute_commenter=enable_attribute_commenter,
309+
enable_transaction_spans=enable_transaction_spans,
304310
)
305311
return db_integration.wrapped_connection(wrapped, args, kwargs)
306312

@@ -338,6 +344,7 @@ def instrument_connection(
338344
connect_module: Callable[..., Any] | None = None,
339345
enable_attribute_commenter: bool = False,
340346
db_api_integration_factory: type[DatabaseApiIntegration] | None = None,
347+
enable_transaction_spans: bool = True,
341348
) -> TracedConnectionProxy[ConnectionT]:
342349
"""Enable instrumentation in a database connection.
343350
@@ -359,6 +366,7 @@ def instrument_connection(
359366
replacement for :class:`DatabaseApiIntegration`. Can be used to
360367
obtain connection attributes from the connect method instead of
361368
from the connection itself (as done by the pymssql intrumentor).
369+
enable_transaction_spans: Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
362370
363371
Returns:
364372
An instrumented connection.
@@ -382,6 +390,7 @@ def instrument_connection(
382390
commenter_options=commenter_options,
383391
connect_module=connect_module,
384392
enable_attribute_commenter=enable_attribute_commenter,
393+
enable_transaction_spans=enable_transaction_spans,
385394
)
386395
db_integration.get_connection_attributes(connection)
387396
return get_traced_connection_proxy(connection, db_integration)
@@ -418,6 +427,7 @@ def __init__(
418427
commenter_options: dict[str, Any] | None = None,
419428
connect_module: Callable[..., Any] | None = None,
420429
enable_attribute_commenter: bool = False,
430+
enable_transaction_spans: bool = True,
421431
):
422432
if connection_attributes is None:
423433
self.connection_attributes = {
@@ -440,6 +450,7 @@ def __init__(
440450
self.enable_commenter = enable_commenter
441451
self.commenter_options = commenter_options
442452
self.enable_attribute_commenter = enable_attribute_commenter
453+
self.enable_transaction_spans = enable_transaction_spans
443454
self.database_system = database_system
444455
self.connection_props: dict[str, Any] = {}
445456
self.span_attributes: dict[str, Any] = {}
@@ -555,6 +566,17 @@ def get_connection_attributes(self, connection: object) -> None:
555566
if port is not None:
556567
self.span_attributes[SpanAttributes.NET_PEER_PORT] = port
557568

569+
def populate_common_span_attributes(self, span: trace_api.Span) -> None:
570+
"""Populate span with common database connection attributes."""
571+
if not span.is_recording():
572+
return
573+
574+
span.set_attribute(SpanAttributes.DB_SYSTEM, self.database_system)
575+
span.set_attribute(SpanAttributes.DB_NAME, self.database)
576+
577+
for attribute_key, attribute_value in self.span_attributes.items():
578+
span.set_attribute(attribute_key, attribute_value)
579+
558580

559581
# pylint: disable=abstract-method
560582
class TracedConnectionProxy(wrapt.ObjectProxy, Generic[ConnectionT]):
@@ -563,23 +585,54 @@ def __init__(
563585
self,
564586
connection: ConnectionT,
565587
db_api_integration: DatabaseApiIntegration | None = None,
588+
wrap_cursors: bool = True,
566589
):
567590
wrapt.ObjectProxy.__init__(self, connection)
568591
self._self_db_api_integration = db_api_integration
592+
self._self_wrap_cursors = wrap_cursors
569593

570594
def __getattribute__(self, name: str):
571-
if object.__getattribute__(self, name):
595+
# Try to get the attribute from the proxy first
596+
try:
572597
return object.__getattribute__(self, name)
573-
574-
return object.__getattribute__(
575-
object.__getattribute__(self, "_connection"), name
576-
)
598+
except AttributeError:
599+
# If not found on proxy, try the wrapped connection
600+
return object.__getattribute__(
601+
object.__getattribute__(self, "__wrapped__"), name
602+
)
577603

578604
def cursor(self, *args: Any, **kwargs: Any):
579-
return get_traced_cursor_proxy(
580-
self.__wrapped__.cursor(*args, **kwargs),
581-
self._self_db_api_integration,
582-
)
605+
cursor = self.__wrapped__.cursor(*args, **kwargs)
606+
607+
# For databases like psycopg/psycopg2 that use cursor_factory,
608+
# cursor tracing is already handled by the factory, so skip wrapping
609+
if not self._self_wrap_cursors:
610+
return cursor
611+
612+
# For standard dbapi connections, wrap the cursor
613+
return get_traced_cursor_proxy(cursor, self._self_db_api_integration)
614+
615+
def _traced_tx_operation(
616+
self, operation_name: str, operation_method: Callable[[], None]
617+
) -> None:
618+
"""Execute a traced transaction operation (commit, rollback)."""
619+
if not is_instrumentation_enabled():
620+
return operation_method()
621+
622+
if not self._self_db_api_integration.enable_transaction_spans:
623+
return operation_method()
624+
625+
with self._self_db_api_integration._tracer.start_as_current_span(
626+
operation_name, kind=trace_api.SpanKind.CLIENT
627+
) as span:
628+
self._self_db_api_integration.populate_common_span_attributes(span)
629+
return operation_method()
630+
631+
def commit(self):
632+
return self._traced_tx_operation("COMMIT", self.__wrapped__.commit)
633+
634+
def rollback(self):
635+
return self._traced_tx_operation("ROLLBACK", self.__wrapped__.rollback)
583636

584637
def __enter__(self):
585638
self.__wrapped__.__enter__()
@@ -589,13 +642,78 @@ def __exit__(self, *args: Any, **kwargs: Any):
589642
self.__wrapped__.__exit__(*args, **kwargs)
590643

591644

645+
class AsyncTracedConnectionProxy(TracedConnectionProxy[ConnectionT]):
646+
async def _traced_tx_operation_async(
647+
self, operation_name: str, operation_method: Callable[[], Awaitable[None]]
648+
) -> None:
649+
"""Execute a traced async transaction operation (commit, rollback)."""
650+
if not is_instrumentation_enabled():
651+
return await operation_method()
652+
653+
if not self._self_db_api_integration.enable_transaction_spans:
654+
return await operation_method()
655+
656+
with self._self_db_api_integration._tracer.start_as_current_span(
657+
operation_name, kind=trace_api.SpanKind.CLIENT
658+
) as span:
659+
self._self_db_api_integration.populate_common_span_attributes(span)
660+
return await operation_method()
661+
662+
async def commit(self):
663+
"""Async commit for async connections (e.g., psycopg.AsyncConnection)."""
664+
return await self._traced_tx_operation_async("COMMIT", self.__wrapped__.commit)
665+
666+
async def rollback(self):
667+
"""Async rollback for async connections (e.g., psycopg.AsyncConnection)."""
668+
return await self._traced_tx_operation_async("ROLLBACK", self.__wrapped__.rollback)
669+
670+
# Async context manager support
671+
async def __aenter__(self):
672+
if hasattr(self.__wrapped__, "__aenter__"):
673+
await self.__wrapped__.__aenter__()
674+
return self
675+
676+
async def __aexit__(self, *args: Any, **kwargs: Any):
677+
if hasattr(self.__wrapped__, "__aexit__"):
678+
return await self.__wrapped__.__aexit__(*args, **kwargs)
679+
680+
592681
def get_traced_connection_proxy(
593682
connection: ConnectionT,
594683
db_api_integration: DatabaseApiIntegration | None,
684+
wrap_cursors: bool = True,
595685
*args: Any,
596686
**kwargs: Any,
597687
) -> TracedConnectionProxy[ConnectionT]:
598-
return TracedConnectionProxy(connection, db_api_integration)
688+
"""Get a traced connection proxy for sync connections.
689+
690+
Args:
691+
connection: The database connection to wrap.
692+
db_api_integration: The database API integration instance.
693+
wrap_cursors: Whether to wrap cursors returned by connection.cursor().
694+
Set to False for databases like psycopg/psycopg2 that handle cursor
695+
tracing via cursor_factory. Defaults to True.
696+
"""
697+
return TracedConnectionProxy(connection, db_api_integration, wrap_cursors)
698+
699+
700+
def get_traced_async_connection_proxy(
701+
connection: ConnectionT,
702+
db_api_integration: DatabaseApiIntegration | None,
703+
wrap_cursors: bool = True,
704+
*args: Any,
705+
**kwargs: Any,
706+
) -> AsyncTracedConnectionProxy[ConnectionT]:
707+
"""Get a traced connection proxy for async connections.
708+
709+
Args:
710+
connection: The async database connection to wrap.
711+
db_api_integration: The database API integration instance.
712+
wrap_cursors: Whether to wrap cursors returned by connection.cursor().
713+
Set to False for databases like psycopg/psycopg2 that handle cursor
714+
tracing via cursor_factory. Defaults to True.
715+
"""
716+
return AsyncTracedConnectionProxy(connection, db_api_integration, wrap_cursors)
599717

600718

601719
class CursorTracer(Generic[CursorT]):
@@ -666,21 +784,12 @@ def _populate_span(
666784
):
667785
if not span.is_recording():
668786
return
787+
788+
self._db_api_integration.populate_common_span_attributes(span)
789+
669790
statement = self.get_statement(cursor, args)
670-
span.set_attribute(
671-
SpanAttributes.DB_SYSTEM, self._db_api_integration.database_system
672-
)
673-
span.set_attribute(
674-
SpanAttributes.DB_NAME, self._db_api_integration.database
675-
)
676791
span.set_attribute(SpanAttributes.DB_STATEMENT, statement)
677792

678-
for (
679-
attribute_key,
680-
attribute_value,
681-
) in self._db_api_integration.span_attributes.items():
682-
span.set_attribute(attribute_key, attribute_value)
683-
684793
if self._db_api_integration.capture_parameters and len(args) > 1:
685794
span.set_attribute("db.statement.parameters", str(args[1]))
686795

instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,62 @@ def test_callproc(self):
10551055
"Test stored procedure",
10561056
)
10571057

1058+
def test_commit(self):
1059+
db_integration = dbapi.DatabaseApiIntegration(
1060+
"instrumenting_module_test_name", "testcomponent"
1061+
)
1062+
mock_connection = db_integration.wrapped_connection(
1063+
mock_connect, {}, {}
1064+
)
1065+
mock_connection.commit()
1066+
spans_list = self.memory_exporter.get_finished_spans()
1067+
self.assertEqual(len(spans_list), 1)
1068+
span = spans_list[0]
1069+
self.assertEqual(span.name, "COMMIT")
1070+
1071+
def test_rollback(self):
1072+
db_integration = dbapi.DatabaseApiIntegration(
1073+
"instrumenting_module_test_name", "testcomponent"
1074+
)
1075+
mock_connection = db_integration.wrapped_connection(
1076+
mock_connect, {}, {}
1077+
)
1078+
mock_connection.rollback()
1079+
spans_list = self.memory_exporter.get_finished_spans()
1080+
self.assertEqual(len(spans_list), 1)
1081+
span = spans_list[0]
1082+
self.assertEqual(span.name, "ROLLBACK")
1083+
1084+
def test_commit_with_suppress_instrumentation(self):
1085+
"""Test that commit doesn't create a span when instrumentation is suppressed"""
1086+
db_integration = dbapi.DatabaseApiIntegration(
1087+
"instrumenting_module_test_name",
1088+
"testcomponent",
1089+
)
1090+
mock_connection = db_integration.wrapped_connection(
1091+
mock_connect, {}, {}
1092+
)
1093+
with suppress_instrumentation():
1094+
mock_connection.commit()
1095+
1096+
spans_list = self.memory_exporter.get_finished_spans()
1097+
self.assertEqual(len(spans_list), 0)
1098+
1099+
def test_rollback_with_suppress_instrumentation(self):
1100+
"""Test that rollback doesn't create a span when instrumentation is suppressed"""
1101+
db_integration = dbapi.DatabaseApiIntegration(
1102+
"instrumenting_module_test_name",
1103+
"testcomponent",
1104+
)
1105+
mock_connection = db_integration.wrapped_connection(
1106+
mock_connect, {}, {}
1107+
)
1108+
with suppress_instrumentation():
1109+
mock_connection.rollback()
1110+
1111+
spans_list = self.memory_exporter.get_finished_spans()
1112+
self.assertEqual(len(spans_list), 0)
1113+
10581114
@mock.patch("opentelemetry.instrumentation.dbapi")
10591115
def test_wrap_connect(self, mock_dbapi):
10601116
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
@@ -1209,6 +1265,14 @@ def __init__(self, database, server_port, server_host, user):
12091265
def cursor(self):
12101266
return MockCursor()
12111267

1268+
# pylint: disable=no-self-use
1269+
def commit(self):
1270+
pass
1271+
1272+
# pylint: disable=no-self-use
1273+
def rollback(self):
1274+
pass
1275+
12121276

12131277
class MockCursor:
12141278
def __init__(self) -> None:

instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def _instrument(self, **kwargs):
182182
enable_attribute_commenter = kwargs.get(
183183
"enable_attribute_commenter", False
184184
)
185+
enable_transaction_spans = kwargs.get("enable_transaction_spans", True)
185186

186187
dbapi.wrap_connect(
187188
__name__,
@@ -194,6 +195,7 @@ def _instrument(self, **kwargs):
194195
enable_commenter=enable_sqlcommenter,
195196
commenter_options=commenter_options,
196197
enable_attribute_commenter=enable_attribute_commenter,
198+
enable_transaction_spans=enable_transaction_spans,
197199
)
198200

199201
def _uninstrument(self, **kwargs):
@@ -208,6 +210,7 @@ def instrument_connection(
208210
enable_commenter=None,
209211
commenter_options=None,
210212
enable_attribute_commenter=None,
213+
enable_transaction_spans=True,
211214
):
212215
"""Enable instrumentation in a MySQL connection.
213216
@@ -225,6 +228,8 @@ def instrument_connection(
225228
Optional configurations for tags to be appended at the sql query.
226229
enable_attribute_commenter:
227230
Optional flag to enable/disable addition of sqlcomment to span attribute (default False). Requires enable_commenter=True.
231+
enable_transaction_spans:
232+
Flag to enable/disable transaction spans (commit/rollback). Defaults to True.
228233
229234
Returns:
230235
An instrumented MySQL connection with OpenTelemetry tracing enabled.
@@ -240,6 +245,7 @@ def instrument_connection(
240245
commenter_options=commenter_options,
241246
connect_module=mysql.connector,
242247
enable_attribute_commenter=enable_attribute_commenter,
248+
enable_transaction_spans=enable_transaction_spans,
243249
)
244250

245251
def uninstrument_connection(self, connection):

0 commit comments

Comments
 (0)