Skip to content

Commit 7f7bcc8

Browse files
authored
fix(ray): increase ray instrumentation stability (#15446)
Ray integration was until very soon only tested internally. We had several issues with instrumenting jobs based on when ray was initialized and some crashes related to context injection. This PR is created to fix all the small issues and make the integration stable. Ray integration should be advertised to be used only on versions including this fix. The releasenote is covering the only customer facing issue. ## Context During internal dogfooding some bugs were found/some assumptions were challenged: - Most of the users can't called `ray.init()` at the top of the file (which was a prerequisite for a fully working instrumentation). After analyzing all the consequences of calling ray.init() late or waiting for auto-initialization (for example when a task is submitted for the first time), it appears that the only viable solution was to change the way we instrument the cluster. - Some ray internal tasks were instrumented and created non actionnable/useful orphan spans. - We still had issues with context injection - Service names were not properly working when unspecified. ## Changes - Update docs to recommend using ddtrace-run. Note that we advise to deactivate some integrations otherwise instrumenting ray cluster is very noisy. We will be able to remove that if we make an upstream contribution. - Fix service names and specify precedence order in docs. - Prevent some potential future bugs by not instrumenting cross_language tasks - Add a fix for injecting `dd_trace_ctx` in tasks. ## Testing I added some tests to ensure all the behavior are tested: - Added some tests related to service name - Added some tests to show the issues with not calling ray.init() to be sure it is documented and any changes will be detected. This tests are also ensuring that nothing crashes. ## Additional Notes Ray test suite were run while ddtrace activated increasing the confidence in this PR.
1 parent fe695a8 commit 7f7bcc8

20 files changed

+1777
-354
lines changed

ddtrace/contrib/internal/ray/__init__.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,18 @@
1010
Ray instrumentation is experimental. It is deactivated by default. To enable it,
1111
you have to follow one of the two methods below:
1212
13-
The recommended way to instrument Ray, is to instrument the Ray cluster.
14-
You can do it by starting the Ray head with a tracing startup hook::
13+
The recommended way to instrument Ray, is to instrument the Ray cluster using ddtrace-run::
1514
16-
ray start --head --tracing-startup-hook=ddtrace.contrib.ray:setup_tracing
15+
DD_PATCH_MODULES="ray:true, aiohttp:false, grpc:false, requests:false" ddtrace-run ray start --head
1716
18-
Otherwise, you can specify the tracing hook in `ray.init()` using::
17+
DD_PATCH_MODULES will allow to reduce noise by sending only the jobs related spans.
1918
20-
ray.init(_tracing_startup_hook="ddtrace.contrib.ray:setup_tracing")
19+
You can also do it by starting Ray head with a tracing startup hook::
2120
22-
Note that this method does not provide full tracing capabilities.
21+
ray start --head --tracing-startup-hook=ddtrace.contrib.ray:setup_tracing
2322
23+
Note that this method does not provide full tracing capabilities if ``ray.init()`` is not called at the top
24+
of your job scripts.
2425
2526
Configuration
2627
~~~~~~~~~~~~~
@@ -44,21 +45,17 @@
4445
(default: ``True``). If ``True``, file paths in the entrypoint will be redacted to avoid
4546
leaking sensitive information.
4647
47-
Ray service name can be configured by:
48+
Ray service name can be configured, in order of precedence by:
4849
49-
- specifying in submission ID using ``job:your-job-name`` during job submission::
50+
- specifying ``DD_SERVICE`` when initializing your Ray cluster.
5051
51-
ray job submit --submission-id="job:my_model,run:39" -- python entrypoint.py
52+
- setting ``DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME=True``. In this case, the service
53+
name will be the name of your entrypoint script.
5254
5355
- specifying in metadata during job submission::
5456
5557
ray job submit --metadata-json='{"job_name": "my_model"}' -- python entrypoint.py
5658
57-
- specifying ``DD_SERVICE`` when initializing your Ray cluster.
58-
59-
- setting ``DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME=True``. In this case, the service
60-
name will be the name of your entrypoint script.
61-
6259
By default, the service name will be ``unnamed.ray.job``.
6360
6461
Notes

ddtrace/contrib/internal/ray/patch.py

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,14 @@
6868
RAY_SERVICE_NAME = os.environ.get(RAY_JOB_NAME)
6969

7070
# Ray modules that should be excluded from tracing
71-
RAY_MODULE_DENYLIST = {
72-
"ray.dag",
71+
RAY_COMMON_MODULE_DENYLIST = {
72+
"ray.data._internal",
73+
}
74+
75+
RAY_TASK_MODULE_DENYLIST = {*RAY_COMMON_MODULE_DENYLIST}
76+
77+
RAY_ACTOR_MODULE_DENYLIST = {
78+
*RAY_COMMON_MODULE_DENYLIST,
7379
"ray.experimental",
7480
"ray.data._internal",
7581
}
@@ -143,6 +149,11 @@ def _wrap_task_execution(wrapped, *args, **kwargs):
143149

144150
def traced_submit_task(wrapped, instance, args, kwargs):
145151
"""Trace task submission, i.e the func.remote() call"""
152+
153+
# Tracing doesn't work for cross lang yet.
154+
if instance._function.__module__ in RAY_TASK_MODULE_DENYLIST or instance._is_cross_language:
155+
return wrapped(*args, **kwargs)
156+
146157
if tracer.current_span() is None:
147158
log.debug(
148159
"No active span found in %s.remote(), activating trace context from environment", instance._function_name
@@ -153,11 +164,10 @@ def traced_submit_task(wrapped, instance, args, kwargs):
153164
# This is done under a lock as multiple task could be submit at the same time
154165
# and thus try to modify the signature as the same time
155166
with instance._inject_lock:
156-
if not getattr(instance._function, "_dd_trace_wrapped", False):
167+
if instance._function_signature is None:
157168
instance._function = _wrap_remote_function_execution(instance._function)
158169
instance._function.__signature__ = _inject_dd_trace_ctx_kwarg(instance._function)
159170
instance._function_signature = extract_signature(instance._function)
160-
instance._function._dd_trace_wrapped = True
161171

162172
with tracer.trace(
163173
"task.submit",
@@ -197,22 +207,24 @@ def traced_submit_job(wrapped, instance, args, kwargs):
197207
"""
198208
from ray.dashboard.modules.job.job_manager import generate_job_id
199209

200-
# Three ways of specifying the job name, in order of precedence:
201-
# 1. Metadata JSON: ray job submit --metadata_json '{"job_name": "train.cool.model"}' train.py
202-
# 2. Special submission ID format: ray job submit --submission_id "job:train.cool.model,run:38" train.py
203-
# 3. Ray entrypoint: ray job submit train_cool_model.py
210+
# Three ways of setting the service name of the spans, in order of precedence:
211+
# - DD_SERVICE environment variable
212+
# - The name of the entrypoint if DD_TRACE_RAY_USE_ENTRYPOINT_AS_SERVICE_NAME is True
213+
# - Metadata JSON: ray job submit --metadata_json '{"job_name": "train.cool.model"}'
214+
# Otherwise set to unnamed.ray.job
204215
submission_id = kwargs.get("submission_id") or generate_job_id()
205216
kwargs["submission_id"] = submission_id
217+
206218
entrypoint = kwargs.get("entrypoint", "")
207-
if entrypoint and config.ray.redact_entrypoint_paths:
219+
if config.ray.redact_entrypoint_paths:
208220
entrypoint = redact_paths(entrypoint)
209-
job_name = config.service or kwargs.get("metadata", {}).get("job_name", "")
210221

211-
if not job_name:
212-
if config.ray.use_entrypoint_as_service_name:
213-
job_name = get_dd_job_name_from_entrypoint(entrypoint) or DEFAULT_JOB_NAME
214-
else:
215-
job_name = DEFAULT_JOB_NAME
222+
if config.ray.use_entrypoint_as_service_name:
223+
job_name = get_dd_job_name_from_entrypoint(entrypoint) or DEFAULT_JOB_NAME
224+
else:
225+
user_provided_service = config.service if config._is_user_provided_service else None
226+
metadata_job_name = kwargs.get("metadata", {}).get("job_name", None)
227+
job_name = user_provided_service or metadata_job_name or DEFAULT_JOB_NAME
216228

217229
job_span = tracer.start_span("ray.job", service=job_name or DEFAULT_JOB_NAME, span_type=SpanTypes.RAY)
218230
try:
@@ -380,12 +392,12 @@ def traced_wait(wrapped, instance, args, kwargs):
380392

381393

382394
def _job_supervisor_run_wrapper(method: Callable[..., Any]) -> Any:
383-
async def _traced_run_method(self: Any, *args: Any, _dd_ray_trace_ctx, **kwargs: Any) -> Any:
395+
async def _traced_run_method(self: Any, *args: Any, _dd_ray_trace_ctx=None, **kwargs: Any) -> Any:
384396
import ray.exceptions
385397

386398
from ddtrace.ext import SpanTypes
387399

388-
context = _TraceContext._extract(_dd_ray_trace_ctx)
400+
context = _TraceContext._extract(_dd_ray_trace_ctx) if _dd_ray_trace_ctx else None
389401
submission_id = os.environ.get(RAY_SUBMISSION_ID)
390402

391403
with long_running_ray_span(
@@ -497,7 +509,7 @@ def inject_tracing_into_actor_class(wrapped, instance, args, kwargs):
497509
class_name = str(cls.__name__)
498510

499511
# Skip tracing for certain ray modules
500-
if any(module_name.startswith(denied_module) for denied_module in RAY_MODULE_DENYLIST):
512+
if any(module_name.startswith(denied_module) for denied_module in RAY_ACTOR_MODULE_DENYLIST):
501513
return cls
502514

503515
# Actor beginning with _ are considered internal and will not be traced
@@ -557,6 +569,10 @@ def patch():
557569

558570
ray._datadog_patch = True
559571

572+
from ray.util.tracing import tracing_helper
573+
574+
tracing_helper._global_is_tracing_enabled = False
575+
560576
@ModuleWatchdog.after_module_imported("ray.actor")
561577
def _(m):
562578
_w(m.ActorHandle, "_actor_method_call", traced_actor_method_call)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
fixes:
3+
- |
4+
ray: This fix resolves an issue where Ray jobs that did not explicitly call ``ray.init()`` at the top of their scripts
5+
were not properly instrumented, resulting in incomplete traces.
6+
To ensure full tracing capabilities, use ``ddtrace-run`` when starting your Ray cluster:
7+
``DD_PATCH_MODULES="ray:true,aiohttp:false,grpc:false,requests:false" ddtrace-run ray start --head``.

tests/contrib/ray/jobs/actor_and_task.py

Lines changed: 0 additions & 62 deletions
This file was deleted.

tests/contrib/ray/jobs/actor_interactions.py

Lines changed: 0 additions & 49 deletions
This file was deleted.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import ray
2+
3+
4+
@ray.remote
5+
class Counter:
6+
def __init__(self):
7+
self.value = 0
8+
9+
def increment(self):
10+
self.value += 1
11+
return self.value
12+
13+
def get_value(self):
14+
return self.value
15+
16+
17+
def main():
18+
counter = Counter.remote()
19+
result = ray.get(counter.increment.remote())
20+
assert result == 1, f"Expected 1, got {result}"
21+
22+
23+
if __name__ == "__main__":
24+
main()

tests/contrib/ray/jobs/error_in_task.py

Lines changed: 0 additions & 21 deletions
This file was deleted.

tests/contrib/ray/jobs/nested_tasks.py

Lines changed: 0 additions & 27 deletions
This file was deleted.

tests/contrib/ray/jobs/service.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import ray
2+
3+
4+
@ray.remote
5+
def add_one(x):
6+
return x + 1
7+
8+
9+
def main():
10+
futures_add_one = add_one.remote(0)
11+
result = ray.get(futures_add_one)
12+
assert result == 1, f"Unexpected results: {result}"
13+
14+
15+
if __name__ == "__main__":
16+
main()

0 commit comments

Comments
 (0)