diff --git a/ddtrace/contrib/internal/ray/patch.py b/ddtrace/contrib/internal/ray/patch.py index dea7d2ec14a..629cd768ddc 100644 --- a/ddtrace/contrib/internal/ray/patch.py +++ b/ddtrace/contrib/internal/ray/patch.py @@ -46,6 +46,7 @@ from .utils import _inject_dd_trace_ctx_kwarg from .utils import _inject_ray_span_tags from .utils import extract_signature +from .utils import get_dd_job_name logger = logging.getLogger(__name__) @@ -119,7 +120,7 @@ def _wrap_task_execution(wrapped, *args, **kwargs): with long_running_ray_span( f"{function_module}.{function_name}", - service=os.environ.get("_RAY_SUBMISSION_ID"), + service=get_dd_job_name(), span_type=SpanTypes.RAY, child_of=extracted_context, activate=True, @@ -150,7 +151,7 @@ def traced_submit_task(wrapped, instance, args, kwargs): instance._function_signature = extract_signature(instance._function) with tracer.trace( - f"{instance._function_name}.remote()", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY + f"{instance._function_name}.remote()", service=get_dd_job_name(), span_type=SpanTypes.RAY ) as span: span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) _inject_ray_span_tags(span) @@ -170,8 +171,8 @@ def traced_submit_task(wrapped, instance, args, kwargs): def traced_submit_job(wrapped, instance, args, kwargs): """Trace job submission. This function is also responsible of creating the root span. - It will also inject _RAY_SUBMISSION_ID - in the env variable as some spans will not have access to it + It will also inject _RAY_SUBMISSION_ID and _RAY_JOB_NAME + in the env variable as some spans will not have access to them trough ray_ctx """ @@ -180,23 +181,29 @@ def traced_submit_job(wrapped, instance, args, kwargs): submission_id = kwargs.get("submission_id") or generate_job_id() kwargs["submission_id"] = submission_id + job_name = kwargs.get("metadata", {}).get("job_name", "") # Root span creation - job_span = tracer.start_span("ray.job", service=submission_id, span_type=SpanTypes.RAY) + job_span = tracer.start_span("ray.job", service=job_name or get_dd_job_name(submission_id), span_type=SpanTypes.RAY) job_span.set_tag_str("component", "ray") job_span.set_tag_str("ray.submission_id", submission_id) tracer.context_provider.activate(job_span) start_long_running_job(job_span) try: - with tracer.trace("ray.job.submit", service=submission_id, span_type=SpanTypes.RAY) as submit_span: + with tracer.trace( + "ray.job.submit", service=job_name or get_dd_job_name(submission_id), span_type=SpanTypes.RAY + ) as submit_span: submit_span.set_tag_str("component", "ray") submit_span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) + submit_span.set_tag_str("ray.submission_id", submission_id) # Inject the context of the job so that ray.job.run is its child env_vars = kwargs.setdefault("runtime_env", {}).setdefault("env_vars", {}) _TraceContext._inject(job_span.context, env_vars) env_vars["_RAY_SUBMISSION_ID"] = submission_id + if job_name: + env_vars["_RAY_JOB_NAME"] = job_name try: resp = wrapped(*args, **kwargs) @@ -232,7 +239,7 @@ def traced_actor_method_call(wrapped, instance, args, kwargs): tracer.context_provider.activate(_extract_tracing_context_from_env()) with tracer.trace( - f"{actor_name}.{method_name}.remote()", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY + f"{actor_name}.{method_name}.remote()", service=get_dd_job_name(), span_type=SpanTypes.RAY ) as span: span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) _inject_ray_span_tags(span) @@ -257,7 +264,7 @@ async def _traced_run_method(self: Any, *args: Any, _dd_trace_ctx=None, **kwargs with long_running_ray_span( f"{self.__class__.__name__}.{method.__name__}", - service=submission_id, + service=get_dd_job_name(), span_type=SpanTypes.RAY, child_of=context, activate=True, @@ -285,7 +292,7 @@ def _trace_actor_method(self: Any, method: Callable[..., Any], dd_trace_ctx): with long_running_ray_span( f"{self.__class__.__name__}.{method.__name__}", - service=os.environ.get("_RAY_SUBMISSION_ID"), + service=get_dd_job_name(), span_type=SpanTypes.RAY, child_of=context, activate=True, diff --git a/ddtrace/contrib/internal/ray/utils.py b/ddtrace/contrib/internal/ray/utils.py index ef0f794b83b..3c5b34004cb 100644 --- a/ddtrace/contrib/internal/ray/utils.py +++ b/ddtrace/contrib/internal/ray/utils.py @@ -2,15 +2,20 @@ from inspect import Parameter from inspect import Signature import os +import re from typing import Any from typing import Callable from typing import List +from typing import Optional from ddtrace.propagation.http import _TraceContext import ray from ray.runtime_context import get_runtime_context +JOB_NAME_REGEX = re.compile(r"^job\:([A-Za-z0-9_\.\-]+),run:([A-Za-z0-9_\.\-]+)$") + + def _inject_dd_trace_ctx_kwarg(method: Callable) -> Signature: old_sig = inspect.signature(method) if "_dd_trace_ctx" in old_sig.parameters: @@ -152,3 +157,22 @@ def check_cython(x): # Check if function or method, respectively return check_cython(obj) or (hasattr(obj, "__func__") and check_cython(obj.__func__)) + + +def get_dd_job_name(submission_id: Optional[str] = None): + """ + Get the job name from the submission id. + If the submission id is not a valid job name, return the default job name. + If the submission id is not set, return the default job name. + """ + job_name = os.environ.get("_RAY_JOB_NAME") + if job_name: + return job_name + if submission_id is None: + submission_id = os.environ.get("_RAY_SUBMISSION_ID") or "" + match = JOB_NAME_REGEX.match(submission_id) + if match: + return match.group(1) + elif submission_id: + return submission_id + return None diff --git a/tests/contrib/ray/test_ray.py b/tests/contrib/ray/test_ray.py index fc0d35c9086..21f6b7b4784 100644 --- a/tests/contrib/ray/test_ray.py +++ b/tests/contrib/ray/test_ray.py @@ -49,6 +49,7 @@ def submit_ray_job(script_name, timeout=60): "meta.ray.job.message", "meta.error.stack", "meta._dd.base_service", + "meta._dd.hostname", # Service names that include dynamic submission IDs "service", # Base service sometimes gets set to a different value in CI than in the local environment, diff --git a/tests/contrib/ray/test_ray_utils.py b/tests/contrib/ray/test_ray_utils.py new file mode 100644 index 00000000000..592ba5abfcc --- /dev/null +++ b/tests/contrib/ray/test_ray_utils.py @@ -0,0 +1,19 @@ +import os + +from ddtrace.contrib.internal.ray.utils import get_dd_job_name + + +def test_get_dd_job_name(): + assert get_dd_job_name("job:frobnitzigate_idiosyncrasies,run:38") == "frobnitzigate_idiosyncrasies" + assert get_dd_job_name("joe.schmoe-cf32445c3b2842958956ba6b6225ad") == "joe.schmoe-cf32445c3b2842958956ba6b6225ad" + assert get_dd_job_name("mortar.clustering.pipeline") == "mortar.clustering.pipeline" + os.environ["_RAY_JOB_NAME"] = "train.cool.model" + assert get_dd_job_name("whatever") == "train.cool.model" + del os.environ["_RAY_JOB_NAME"] + assert get_dd_job_name() == "unspecified.ray.job" + os.environ["_RAY_SUBMISSION_ID"] = "job:frobnitzigate_idiosyncrasies,run:38" + assert get_dd_job_name() == "frobnitzigate_idiosyncrasies" + os.environ["_RAY_SUBMISSION_ID"] = "whatever" + assert get_dd_job_name() == "whatever" + del os.environ["_RAY_SUBMISSION_ID"] + assert get_dd_job_name() == "unspecified.ray.job"