Skip to content
25 changes: 16 additions & 9 deletions ddtrace/contrib/internal/ray/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .utils import _inject_dd_trace_ctx_kwarg
from .utils import _inject_ray_span_tags
from .utils import extract_signature
from .utils import get_dd_job_name


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,7 +120,7 @@ def _wrap_task_execution(wrapped, *args, **kwargs):

with long_running_ray_span(
f"{function_module}.{function_name}",
service=os.environ.get("_RAY_SUBMISSION_ID"),
service=get_dd_job_name(),
span_type=SpanTypes.RAY,
child_of=extracted_context,
activate=True,
Expand Down Expand Up @@ -150,7 +151,7 @@ def traced_submit_task(wrapped, instance, args, kwargs):
instance._function_signature = extract_signature(instance._function)

with tracer.trace(
f"{instance._function_name}.remote()", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY
f"{instance._function_name}.remote()", service=get_dd_job_name(), span_type=SpanTypes.RAY
) as span:
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
_inject_ray_span_tags(span)
Expand All @@ -170,8 +171,8 @@ def traced_submit_task(wrapped, instance, args, kwargs):
def traced_submit_job(wrapped, instance, args, kwargs):
"""Trace job submission. This function is also responsible
of creating the root span.
It will also inject _RAY_SUBMISSION_ID
in the env variable as some spans will not have access to it
It will also inject _RAY_SUBMISSION_ID and _RAY_JOB_NAME
in the env variable as some spans will not have access to them
trough ray_ctx
"""

Expand All @@ -180,23 +181,29 @@ def traced_submit_job(wrapped, instance, args, kwargs):

submission_id = kwargs.get("submission_id") or generate_job_id()
kwargs["submission_id"] = submission_id
job_name = kwargs.get("metadata", {}).get("job_name", "")

# Root span creation
job_span = tracer.start_span("ray.job", service=submission_id, span_type=SpanTypes.RAY)
job_span = tracer.start_span("ray.job", service=job_name or get_dd_job_name(submission_id), span_type=SpanTypes.RAY)
job_span.set_tag_str("component", "ray")
job_span.set_tag_str("ray.submission_id", submission_id)
tracer.context_provider.activate(job_span)
start_long_running_job(job_span)

try:
with tracer.trace("ray.job.submit", service=submission_id, span_type=SpanTypes.RAY) as submit_span:
with tracer.trace(
"ray.job.submit", service=job_name or get_dd_job_name(submission_id), span_type=SpanTypes.RAY
) as submit_span:
submit_span.set_tag_str("component", "ray")
submit_span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
submit_span.set_tag_str("ray.submission_id", submission_id)

# Inject the context of the job so that ray.job.run is its child
env_vars = kwargs.setdefault("runtime_env", {}).setdefault("env_vars", {})
_TraceContext._inject(job_span.context, env_vars)
env_vars["_RAY_SUBMISSION_ID"] = submission_id
if job_name:
env_vars["_RAY_JOB_NAME"] = job_name
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will eventually want to make _RAY_JOB_NAME and _RAY_SUBMISSION_ID string constants instead of strings, but in the interest of minimizing merge conflicts, I am postponing that change until a future PR.


try:
resp = wrapped(*args, **kwargs)
Expand Down Expand Up @@ -232,7 +239,7 @@ def traced_actor_method_call(wrapped, instance, args, kwargs):
tracer.context_provider.activate(_extract_tracing_context_from_env())

with tracer.trace(
f"{actor_name}.{method_name}.remote()", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY
f"{actor_name}.{method_name}.remote()", service=get_dd_job_name(), span_type=SpanTypes.RAY
) as span:
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)
_inject_ray_span_tags(span)
Expand All @@ -257,7 +264,7 @@ async def _traced_run_method(self: Any, *args: Any, _dd_trace_ctx=None, **kwargs

with long_running_ray_span(
f"{self.__class__.__name__}.{method.__name__}",
service=submission_id,
service=get_dd_job_name(),
span_type=SpanTypes.RAY,
child_of=context,
activate=True,
Expand Down Expand Up @@ -285,7 +292,7 @@ def _trace_actor_method(self: Any, method: Callable[..., Any], dd_trace_ctx):

with long_running_ray_span(
f"{self.__class__.__name__}.{method.__name__}",
service=os.environ.get("_RAY_SUBMISSION_ID"),
service=get_dd_job_name(),
span_type=SpanTypes.RAY,
child_of=context,
activate=True,
Expand Down
24 changes: 24 additions & 0 deletions ddtrace/contrib/internal/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@
from inspect import Parameter
from inspect import Signature
import os
import re
from typing import Any
from typing import Callable
from typing import List
from typing import Optional

from ddtrace.propagation.http import _TraceContext
import ray
from ray.runtime_context import get_runtime_context


JOB_NAME_REGEX = re.compile(r"^job\:([A-Za-z0-9_\.\-]+),run:([A-Za-z0-9_\.\-]+)$")


def _inject_dd_trace_ctx_kwarg(method: Callable) -> Signature:
old_sig = inspect.signature(method)
if "_dd_trace_ctx" in old_sig.parameters:
Expand Down Expand Up @@ -152,3 +157,22 @@ def check_cython(x):

# Check if function or method, respectively
return check_cython(obj) or (hasattr(obj, "__func__") and check_cython(obj.__func__))


def get_dd_job_name(submission_id: Optional[str] = None):
"""
Get the job name from the submission id.
If the submission id is not a valid job name, return the default job name.
If the submission id is not set, return the default job name.
"""
job_name = os.environ.get("_RAY_JOB_NAME")
if job_name:
return job_name
if submission_id is None:
submission_id = os.environ.get("_RAY_SUBMISSION_ID") or ""
match = JOB_NAME_REGEX.match(submission_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of extracting the first section of submission id, I would recommend asking users to send additional parameter in job metadata else default it to submission id.

@kanwang thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savitagm I agree and like your idea about passing the job id through the job metadata, the reason I didn't do it right away in the same PR is that from a quick look it didn't seem like Ray passes the job metadata through the runtime context into every span. So I was going to make that change in a separate follow-up PR. I think if there is no model name in the metadata, we should still default to the first part of the submission ID and not to the entire submission ID, so that similar jobs get grouped together. Let me know if you disagree with either of these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with savita on that.
We should let the users choose the name of the service otherwise setting something meaningful. I think the two bests choice would be : ray-job or the submission-id as it is as I think having raysubmit as a service name could lack a bit of meaning

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dubloom @savitagm I have updated the code to support two ways of specifying the job name:

ray job submit --metadata-json='{"job_name": "my_test_job"}'
ray job submit --submission_id="job:my_test_job,run:1"

If neither one of these is specified (no job name in the metadata, and the submission ID is not in the job:X,run:Y format), then the job name will fall back on the full submission ID string as in Louis' original code. Let me know if you think this is reasonable.

if match:
return match.group(1)
elif submission_id:
return submission_id
return None
1 change: 1 addition & 0 deletions tests/contrib/ray/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def submit_ray_job(script_name, timeout=60):
"meta.ray.job.message",
"meta.error.stack",
"meta._dd.base_service",
"meta._dd.hostname",
# Service names that include dynamic submission IDs
"service",
# Base service sometimes gets set to a different value in CI than in the local environment,
Expand Down
19 changes: 19 additions & 0 deletions tests/contrib/ray/test_ray_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os

from ddtrace.contrib.internal.ray.utils import get_dd_job_name


def test_get_dd_job_name():
assert get_dd_job_name("job:frobnitzigate_idiosyncrasies,run:38") == "frobnitzigate_idiosyncrasies"
assert get_dd_job_name("joe.schmoe-cf32445c3b2842958956ba6b6225ad") == "joe.schmoe-cf32445c3b2842958956ba6b6225ad"
assert get_dd_job_name("mortar.clustering.pipeline") == "mortar.clustering.pipeline"
os.environ["_RAY_JOB_NAME"] = "train.cool.model"
assert get_dd_job_name("whatever") == "train.cool.model"
del os.environ["_RAY_JOB_NAME"]
assert get_dd_job_name() == "unspecified.ray.job"
os.environ["_RAY_SUBMISSION_ID"] = "job:frobnitzigate_idiosyncrasies,run:38"
assert get_dd_job_name() == "frobnitzigate_idiosyncrasies"
os.environ["_RAY_SUBMISSION_ID"] = "whatever"
assert get_dd_job_name() == "whatever"
del os.environ["_RAY_SUBMISSION_ID"]
assert get_dd_job_name() == "unspecified.ray.job"
Loading