Skip to content

Commit 5ef0012

Browse files
ayush3singhkaxil
authored andcommitted
Fixes issue RuntimeTaskInstance does not contain log_url | added during taskrunner startup (#50376)
(cherry picked from commit 330789e)
1 parent 8d120ee commit 5ef0012

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ class RuntimeTaskInstance(TaskInstance):
144144

145145
rendered_map_index: str | None = None
146146

147+
log_url: str | None = None
148+
147149
def __rich_repr__(self):
148150
yield "id", self.id
149151
yield "task_id", self.task_id
@@ -549,6 +551,23 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None:
549551
)
550552

551553

554+
def get_log_url_from_ti(ti: RuntimeTaskInstance) -> str:
555+
from urllib.parse import quote
556+
557+
from airflow.configuration import conf
558+
559+
run_id = quote(ti.run_id)
560+
base_url = conf.get("api", "base_url", fallback="http://localhost:8080/")
561+
map_index_value = getattr(ti, "map_index", -1)
562+
map_index = f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else ""
563+
try_number_value = getattr(ti, "try_number", 0)
564+
try_number = (
565+
f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else ""
566+
)
567+
_log_uri = f"{base_url}dags/{ti.dag_id}/runs/{run_id}/tasks/{ti.task_id}{map_index}{try_number}"
568+
return _log_uri
569+
570+
552571
def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
553572
# TODO: Task-SDK:
554573
# Using DagBag here is about 98% wrong, but it'll do for now
@@ -677,6 +696,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
677696

678697
with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id):
679698
ti = parse(msg, log)
699+
ti.log_url = get_log_url_from_ti(ti)
680700
log.debug("DAG file parsed", file=msg.dag_rel_path)
681701

682702
run_as_user = getattr(ti.task, "run_as_user", None) or conf.get(

task-sdk/tests/task_sdk/execution_time/test_task_runner.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
_push_xcom_if_needed,
107107
_xcom_push,
108108
finalize,
109+
get_log_url_from_ti,
109110
parse,
110111
run,
111112
startup,
@@ -2206,6 +2207,8 @@ def execute(self, context):
22062207
mocked_parse(what, "basic_dag", task)
22072208

22082209
runtime_ti, context, log = startup()
2210+
assert runtime_ti is not None
2211+
assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti)
22092212
assert isinstance(listener.component, TaskRunnerMarker)
22102213
del listener.component
22112214

0 commit comments

Comments
 (0)