-
Notifications
You must be signed in to change notification settings - Fork 457
chore(mlobs): infer the name of the job from the submission ID #14540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
e571992
51690f6
7d56dc2
0ed6e6b
c336200
a595b3b
f16e29b
25e75b9
b4b51d9
0cca4f8
25e534a
9deb7f2
3357997
14644b4
9fcf437
3b3479b
9d6ff25
83a9af3
a1fcefe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,8 @@ | |
import inspect | ||
import logging | ||
import os | ||
import threading | ||
import socket | ||
import threading | ||
from typing import Any | ||
from typing import Callable | ||
from typing import Dict | ||
|
@@ -28,10 +28,10 @@ | |
from ddtrace._trace.span import Span | ||
from ddtrace.constants import _DJM_ENABLED_KEY | ||
from ddtrace.constants import _FILTER_KEPT_KEY | ||
from ddtrace.constants import _HOSTNAME_KEY | ||
from ddtrace.constants import _SAMPLING_PRIORITY_KEY | ||
from ddtrace.constants import _SPAN_MEASURED_KEY | ||
from ddtrace.constants import SPAN_KIND | ||
from ddtrace.constants import _HOSTNAME_KEY | ||
from ddtrace.ext import SpanKind | ||
from ddtrace.ext import SpanTypes | ||
from ddtrace.internal.schema import schematize_service_name | ||
|
@@ -46,6 +46,7 @@ | |
from .utils import _inject_dd_trace_ctx_kwarg | ||
from .utils import _inject_ray_span_tags | ||
from .utils import extract_signature | ||
from .utils import get_dd_job_name | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -133,9 +134,7 @@ def _wrap_task_execution(wrapped, *args, **kwargs): | |
function_name = getattr(wrapped, "__name__", "unknown_function") | ||
function_module = getattr(wrapped, "__module__", "unknown_module") | ||
|
||
with tracer.trace( | ||
f"{function_module}.{function_name}", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY | ||
) as span: | ||
with tracer.trace(f"{function_module}.{function_name}", service=get_dd_job_name(), span_type=SpanTypes.RAY) as span: | ||
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) | ||
_inject_ray_span_tags(span) | ||
|
||
|
@@ -165,7 +164,7 @@ def traced_submit_task(wrapped, instance, args, kwargs): | |
instance._function_signature = extract_signature(instance._function) | ||
|
||
with tracer.trace( | ||
f"{instance._function_name}.remote()", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY | ||
f"{instance._function_name}.remote()", service=get_dd_job_name(), span_type=SpanTypes.RAY | ||
) as span: | ||
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) | ||
_inject_ray_span_tags(span) | ||
|
@@ -185,8 +184,8 @@ def traced_submit_task(wrapped, instance, args, kwargs): | |
def traced_submit_job(wrapped, instance, args, kwargs): | ||
"""Trace job submission. This function is also responsible | ||
of creating the root span. | ||
It will also inject _RAY_SUBMISSION_ID | ||
in the env variable as some spans will not have access to it | ||
It will also inject _RAY_SUBMISSION_ID and _RAY_JOB_NAME | ||
in the env variable as some spans will not have access to them | ||
trough ray_ctx | ||
""" | ||
|
||
|
@@ -195,9 +194,10 @@ def traced_submit_job(wrapped, instance, args, kwargs): | |
|
||
submission_id = kwargs.get("submission_id") or generate_job_id() | ||
kwargs["submission_id"] = submission_id | ||
job_name = kwargs.get("metadata", {}).get("job_name", "") | ||
|
||
# Root span creation | ||
job_span = tracer.start_span("ray.job", service=submission_id, span_type=SpanTypes.RAY) | ||
job_span = tracer.start_span("ray.job", service=job_name or get_dd_job_name(submission_id), span_type=SpanTypes.RAY) | ||
job_span.set_tag_str("component", "ray") | ||
job_span.set_tag_str("ray.submission_id", submission_id) | ||
# This will allow to finish the span at the end of the job | ||
|
@@ -206,14 +206,19 @@ def traced_submit_job(wrapped, instance, args, kwargs): | |
# Set global span as the root span | ||
tracer.context_provider.activate(job_span) | ||
try: | ||
with tracer.trace("ray.job.submit", service=submission_id, span_type=SpanTypes.RAY) as submit_span: | ||
with tracer.trace( | ||
"ray.job.submit", service=job_name or get_dd_job_name(submission_id), span_type=SpanTypes.RAY | ||
) as submit_span: | ||
submit_span.set_tag_str("component", "ray") | ||
submit_span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) | ||
submit_span.set_tag_str("ray.submission_id", submission_id) | ||
|
||
# Inject the context of the job so that ray.job.run is its child | ||
env_vars = kwargs.setdefault("runtime_env", {}).setdefault("env_vars", {}) | ||
_TraceContext._inject(job_span.context, env_vars) | ||
env_vars["_RAY_SUBMISSION_ID"] = submission_id | ||
if job_name: | ||
env_vars["_RAY_JOB_NAME"] = job_name | ||
|
||
|
||
try: | ||
resp = wrapped(*args, **kwargs) | ||
|
@@ -250,7 +255,7 @@ def traced_actor_method_call(wrapped, instance, args, kwargs): | |
tracer.context_provider.activate(_extract_tracing_context_from_env()) | ||
|
||
with tracer.trace( | ||
f"{actor_name}.{method_name}.remote()", service=os.environ.get("_RAY_SUBMISSION_ID"), span_type=SpanTypes.RAY | ||
f"{actor_name}.{method_name}.remote()", service=get_dd_job_name(), span_type=SpanTypes.RAY | ||
) as span: | ||
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) | ||
_inject_ray_span_tags(span) | ||
|
@@ -276,7 +281,7 @@ async def _traced_run_method(self: Any, *args: Any, _dd_trace_ctx=None, **kwargs | |
job_submission_id = os.environ.get("_RAY_SUBMISSION_ID") | ||
|
||
with dd_tracer.trace( | ||
f"{self.__class__.__name__}.{method.__name__}", service=job_submission_id, span_type=SpanTypes.RAY | ||
f"{self.__class__.__name__}.{method.__name__}", service=get_dd_job_name(), span_type=SpanTypes.RAY | ||
) as span: | ||
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) | ||
_inject_ray_span_tags(span) | ||
|
@@ -305,7 +310,7 @@ def _trace_actor_method(self: Any, method: Callable[..., Any], dd_trace_ctx): | |
|
||
with tracer.trace( | ||
f"{self.__class__.__name__}.{method.__name__}", | ||
service=os.environ.get("_RAY_SUBMISSION_ID"), | ||
service=get_dd_job_name(), | ||
span_type=SpanTypes.RAY, | ||
) as span: | ||
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,16 +2,22 @@ | |
from inspect import Parameter | ||
from inspect import Signature | ||
import os | ||
import re | ||
from typing import Any | ||
from typing import Callable | ||
from typing import List | ||
from typing import Optional | ||
|
||
import ray | ||
from ray.runtime_context import get_runtime_context | ||
|
||
from ddtrace.propagation.http import _TraceContext | ||
|
||
|
||
DEFAULT_JOB_NAME = "unspecified.ray.job" | ||
JOB_NAME_REGEX = re.compile(r"^job\:([A-Za-z0-9_\.\-]+),run:([A-Za-z0-9_\.\-]+)$") | ||
|
||
|
||
def _inject_dd_trace_ctx_kwarg(method: Callable) -> Signature: | ||
old_sig = inspect.signature(method) | ||
if "_dd_trace_ctx" in old_sig.parameters: | ||
|
@@ -153,3 +159,22 @@ def check_cython(x): | |
|
||
# Check if function or method, respectively | ||
return check_cython(obj) or (hasattr(obj, "__func__") and check_cython(obj.__func__)) | ||
|
||
|
||
def get_dd_job_name(submission_id: Optional[str] = None): | ||
""" | ||
Get the job name from the submission id. | ||
If the submission id is not a valid job name, return the default job name. | ||
If the submission id is not set, return the default job name. | ||
""" | ||
job_name = os.environ.get("_RAY_JOB_NAME") | ||
if job_name: | ||
return job_name | ||
if submission_id is None: | ||
submission_id = os.environ.get("_RAY_SUBMISSION_ID") or "" | ||
match = JOB_NAME_REGEX.match(submission_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of extracting the first section of submission id, I would recommend asking users to send additional parameter in job metadata else default it to submission id. @kanwang thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @savitagm I agree and like your idea about passing the job id through the job metadata, the reason I didn't do it right away in the same PR is that from a quick look it didn't seem like Ray passes the job metadata through the runtime context into every span. So I was going to make that change in a separate follow-up PR. I think if there is no model name in the metadata, we should still default to the first part of the submission ID and not to the entire submission ID, so that similar jobs get grouped together. Let me know if you disagree with either of these. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with savita on that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dubloom @savitagm I have updated the code to support two ways of specifying the job name:
If neither one of these is specified (no job name in the metadata, and the submission ID is not in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the final version of the code we fall back on the name of the Python script from the Ray entry point, and then if that could not be found either, on |
||
if match: | ||
return match.group(1) | ||
elif submission_id: | ||
return submission_id | ||
return DEFAULT_JOB_NAME |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
import os | ||
|
||
from ddtrace.contrib.internal.ray.utils import get_dd_job_name | ||
|
||
|
||
def test_get_dd_job_name(): | ||
assert get_dd_job_name("job:frobnitzigate_idiosyncrasies,run:38") == "frobnitzigate_idiosyncrasies" | ||
assert get_dd_job_name("joe.schmoe-cf32445c3b2842958956ba6b6225ad") == "joe.schmoe-cf32445c3b2842958956ba6b6225ad" | ||
assert get_dd_job_name("mortar.clustering.pipeline") == "mortar.clustering.pipeline" | ||
os.environ["_RAY_JOB_NAME"] = "train.cool.model" | ||
assert get_dd_job_name("whatever") == "train.cool.model" |
Uh oh!
There was an error while loading. Please reload this page.