Skip to content

Commit 2287c4c

Browse files
Add ThreadPoolExecutor metrics to opentelemetry-instrumentation-threading
1 parent 3b97e36 commit 2287c4c

File tree

1 file changed

+100
-25
lines changed
  • instrumentation/opentelemetry-instrumentation-threading/src/opentelemetry/instrumentation/threading

1 file changed

+100
-25
lines changed

instrumentation/opentelemetry-instrumentation-threading/src/opentelemetry/instrumentation/threading/__init__.py

Lines changed: 100 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,18 @@
3939

4040
import threading
4141
from concurrent import futures
42+
from concurrent.futures import Future
4243
from typing import TYPE_CHECKING, Any, Callable, Collection
4344

45+
from opentelemetry.metrics import get_meter
4446
from wrapt import (
4547
wrap_function_wrapper, # type: ignore[reportUnknownVariableType]
4648
)
4749

4850
from opentelemetry import context
4951
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
5052
from opentelemetry.instrumentation.threading.package import _instruments
53+
from opentelemetry.instrumentation.threading.version import __version__
5154
from opentelemetry.instrumentation.utils import unwrap
5255

5356
if TYPE_CHECKING:
@@ -63,11 +66,40 @@ class ThreadingInstrumentor(BaseInstrumentor):
6366
__WRAPPER_START_METHOD = "start"
6467
__WRAPPER_RUN_METHOD = "run"
6568
__WRAPPER_SUBMIT_METHOD = "submit"
69+
__WRAPPER_INIT_METHOD = "__init__"
6670

6771
def instrumentation_dependencies(self) -> Collection[str]:
6872
return _instruments
6973

7074
def _instrument(self, **kwargs: Any):
75+
meter_provider = kwargs.get("meter_provider")
76+
meter = get_meter(
77+
__name__,
78+
__version__,
79+
meter_provider,
80+
schema_url="https://opentelemetry.io/schemas/1.38.0",
81+
)
82+
83+
self.working_items_count = meter.create_up_down_counter(
84+
name="python.threadpool.working_items.count",
85+
unit="threads",
86+
description="The number of jobs currently being processed by the thread pool",
87+
)
88+
self.queue_count = meter.create_up_down_counter(
89+
name="python.threadpool.queue.length",
90+
unit="threads",
91+
description="The number of jobs currently queued in the thread pool",
92+
)
93+
self.thread_count = meter.create_gauge(
94+
name="python.threadpool.thread.count",
95+
unit="threads",
96+
description="The maximum number of concurrent jobs allowed in the thread pool",
97+
)
98+
self.max_thread_count = meter.create_gauge(
99+
name="python.threadpool.thread.max_count",
100+
unit="threads",
101+
description="The maximum number of concurrent jobs allowed in the thread pool",
102+
)
71103
self._instrument_thread()
72104
self._instrument_timer()
73105
self._instrument_thread_pool()
@@ -103,12 +135,16 @@ def _instrument_timer():
103135
ThreadingInstrumentor.__wrap_threading_run,
104136
)
105137

106-
@staticmethod
107-
def _instrument_thread_pool():
138+
def _instrument_thread_pool(self):
139+
wrap_function_wrapper(
140+
futures.ThreadPoolExecutor,
141+
ThreadingInstrumentor.__WRAPPER_INIT_METHOD,
142+
self.__build_wrap_thread_pool_init(),
143+
)
108144
wrap_function_wrapper(
109145
futures.ThreadPoolExecutor,
110146
ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD,
111-
ThreadingInstrumentor.__wrap_thread_pool_submit,
147+
self.__build_wrap_thread_pool_submit(),
112148
)
113149

114150
@staticmethod
@@ -123,6 +159,10 @@ def _uninstrument_timer():
123159

124160
@staticmethod
125161
def _uninstrument_thread_pool():
162+
unwrap(
163+
futures.ThreadPoolExecutor,
164+
ThreadingInstrumentor.__WRAPPER_INIT_METHOD,
165+
)
126166
unwrap(
127167
futures.ThreadPoolExecutor,
128168
ThreadingInstrumentor.__WRAPPER_SUBMIT_METHOD,
@@ -153,26 +193,61 @@ def __wrap_threading_run(
153193
if token is not None:
154194
context.detach(token)
155195

156-
@staticmethod
157-
def __wrap_thread_pool_submit(
158-
call_wrapped: Callable[..., R],
159-
instance: futures.ThreadPoolExecutor,
160-
args: tuple[Callable[..., Any], ...],
161-
kwargs: dict[str, Any],
162-
) -> R:
163-
# obtain the original function and wrapped kwargs
164-
original_func = args[0]
165-
otel_context = context.get_current()
166-
167-
def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R:
168-
token = None
196+
def __build_wrap_thread_pool_submit(self) -> Callable[..., Future[R]]:
197+
def __wrap_thread_pool_submit(
198+
call_wrapped: Callable[..., Future[R]],
199+
instance: futures.ThreadPoolExecutor,
200+
args: tuple[Callable[..., Any], ...],
201+
kwargs: dict[str, Any],
202+
) -> Future[R]:
203+
# obtain the original function and wrapped kwargs
204+
original_func = args[0]
205+
otel_context = context.get_current()
206+
attributes = {
207+
"threadpool.executor": instance._thread_name_prefix,
208+
}
209+
210+
def wrapped_func(*func_args: Any, **func_kwargs: Any) -> R:
211+
token = None
212+
try:
213+
token = context.attach(otel_context)
214+
self.queue_count.add(-1, attributes)
215+
self.working_items_count.add(1, attributes)
216+
return original_func(*func_args, **func_kwargs)
217+
finally:
218+
if token is not None:
219+
context.detach(token)
220+
221+
# replace the original function with the wrapped function
222+
new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[
223+
1:
224+
]
225+
self.queue_count.add(1, attributes)
169226
try:
170-
token = context.attach(otel_context)
171-
return original_func(*func_args, **func_kwargs)
172-
finally:
173-
if token is not None:
174-
context.detach(token)
175-
176-
# replace the original function with the wrapped function
177-
new_args: tuple[Callable[..., Any], ...] = (wrapped_func,) + args[1:]
178-
return call_wrapped(*new_args, **kwargs)
227+
future = call_wrapped(*new_args, **kwargs)
228+
except RuntimeError:
229+
self.queue_count.add(-1, attributes)
230+
raise
231+
232+
self.thread_count.set(len(instance._threads), attributes)
233+
future.add_done_callback(
234+
lambda _: self.working_items_count.add(-1, attributes)
235+
)
236+
return future
237+
238+
return __wrap_thread_pool_submit
239+
240+
def __build_wrap_thread_pool_init(self) -> Callable[..., None]:
241+
def __wrap_thread_pool_init(
242+
call_wrapped: Callable[..., None],
243+
instance: futures.ThreadPoolExecutor,
244+
args: tuple[Callable[..., Any], ...],
245+
kwargs: dict[str, Any],
246+
) -> None:
247+
call_wrapped(*args, **kwargs)
248+
attributes = {
249+
"threadpool.executor": instance._thread_name_prefix,
250+
}
251+
self.max_thread_count.set(instance._max_workers, attributes)
252+
253+
return __wrap_thread_pool_init

0 commit comments

Comments
 (0)