Skip to content

Commit 330789e

Browse files
authored
Fixes issue RuntimeTaskInstance does not contain log_url | added during taskrunner startup (#50376)
1 parent 2bc2de8 commit 330789e

File tree

4 files changed

+25
-0
lines changed

4 files changed

+25
-0
lines changed

providers/openlineage/src/airflow/providers/openlineage/utils/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,7 @@ class TaskInstanceInfo(InfoJsonEncodable):
444444

445445
includes = ["duration", "try_number", "pool", "queued_dttm", "log_url"]
446446
casts = {
447+
"log_url": lambda ti: getattr(ti, "log_url", None),
447448
"map_index": lambda ti: ti.map_index if getattr(ti, "map_index", -1) != -1 else None,
448449
"dag_bundle_version": lambda ti: (
449450
ti.bundle_instance.version if hasattr(ti, "bundle_instance") else None

providers/openlineage/tests/unit/openlineage/utils/test_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,7 @@ def test_taskinstance_info_af3():
14991499
runtime_ti.bundle_instance = bundle_instance
15001500

15011501
assert dict(TaskInstanceInfo(runtime_ti)) == {
1502+
"log_url": None,
15021503
"map_index": 2,
15031504
"try_number": 1,
15041505
"dag_bundle_version": "bundle_version",

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ class RuntimeTaskInstance(TaskInstance):
137137

138138
rendered_map_index: str | None = None
139139

140+
log_url: str | None = None
141+
140142
def __rich_repr__(self):
141143
yield "id", self.id
142144
yield "task_id", self.task_id
@@ -546,6 +548,23 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None:
546548
)
547549

548550

551+
def get_log_url_from_ti(ti: RuntimeTaskInstance) -> str:
552+
from urllib.parse import quote
553+
554+
from airflow.configuration import conf
555+
556+
run_id = quote(ti.run_id)
557+
base_url = conf.get("api", "base_url", fallback="http://localhost:8080/")
558+
map_index_value = getattr(ti, "map_index", -1)
559+
map_index = f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else ""
560+
try_number_value = getattr(ti, "try_number", 0)
561+
try_number = (
562+
f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else ""
563+
)
564+
_log_uri = f"{base_url}dags/{ti.dag_id}/runs/{run_id}/tasks/{ti.task_id}{map_index}{try_number}"
565+
return _log_uri
566+
567+
549568
def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
550569
# TODO: Task-SDK:
551570
# Using DagBag here is about 98% wrong, but it'll do for now
@@ -703,6 +722,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
703722

704723
with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id):
705724
ti = parse(msg, log)
725+
ti.log_url = get_log_url_from_ti(ti)
706726
log.debug("DAG file parsed", file=msg.dag_rel_path)
707727
else:
708728
raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}")

task-sdk/tests/task_sdk/execution_time/test_task_runner.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
_push_xcom_if_needed,
107107
_xcom_push,
108108
finalize,
109+
get_log_url_from_ti,
109110
parse,
110111
run,
111112
startup,
@@ -2144,6 +2145,8 @@ def execute(self, context):
21442145
mocked_parse(what, "basic_dag", task)
21452146

21462147
runtime_ti, context, log = startup()
2148+
assert runtime_ti is not None
2149+
assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti)
21472150
assert isinstance(listener.component, TaskRunnerMarker)
21482151
del listener.component
21492152

0 commit comments

Comments
 (0)