Skip to content

Commit a591c6d

Browse files
Ayushayush369
authored andcommitted
Fixes issue RuntimeTaskInstance context does not contain log_url
1 parent ce6e31b commit a591c6d

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

providers/openlineage/tests/unit/openlineage/utils/test_utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,7 @@ def test_taskinstance_info_af3():
14991499
runtime_ti.bundle_instance = bundle_instance
15001500

15011501
assert dict(TaskInstanceInfo(runtime_ti)) == {
1502+
"log_url": None,
15021503
"map_index": 2,
15031504
"try_number": 1,
15041505
"dag_bundle_version": "bundle_version",

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ class RuntimeTaskInstance(TaskInstance):
137137

138138
rendered_map_index: str | None = None
139139

140+
log_url: str | None = None
141+
142+
mark_success_url: str | None = None
143+
140144
def __rich_repr__(self):
141145
yield "id", self.id
142146
yield "task_id", self.task_id
@@ -148,6 +152,27 @@ def __rich_repr__(self):
148152

149153
__rich_repr__.angular = True # type: ignore[attr-defined]
150154

155+
def get_mark_success_url(self):
156+
return self.log_url
157+
158+
def get_log_url_from_ti(self):
159+
from urllib.parse import quote
160+
161+
from airflow.configuration import conf
162+
163+
run_id = quote(self.run_id)
164+
base_url = conf.get("api", "base_url", fallback="http://localhost:8080/")
165+
map_index_value = getattr(self, "map_index", -1)
166+
map_index = (
167+
f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else ""
168+
)
169+
try_number_value = getattr(self, "try_number", 0)
170+
try_number = (
171+
f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else ""
172+
)
173+
_log_uri = f"{base_url}dags/{self.dag_id}/runs/{run_id}/tasks/{self.task_id}{map_index}{try_number}"
174+
return _log_uri
175+
151176
def get_template_context(self) -> Context:
152177
# TODO: Move this to `airflow.sdk.execution_time.context`
153178
# once we port the entire context logic from airflow/utils/context.py ?
@@ -158,6 +183,9 @@ def get_template_context(self) -> Context:
158183

159184
validated_params = process_params(self.task.dag, self.task, dag_run_conf, suppress_exception=False)
160185

186+
self.log_url = self.get_log_url_from_ti()
187+
self.mark_success_url = self.get_mark_success_url()
188+
161189
context: Context = {
162190
# From the Task Execution interface
163191
"dag": self.task.dag,

task-sdk/tests/task_sdk/execution_time/test_task_runner.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,8 @@ def test_get_context_without_ti_context_from_server(self, mocked_parse, make_ti_
10281028
_ti_context_from_server=None,
10291029
start_date=start_date,
10301030
)
1031+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
1032+
10311033
context = runtime_ti.get_template_context()
10321034

10331035
# Verify the context keys and values
@@ -1061,6 +1063,7 @@ def test_get_context_with_ti_context_from_server(self, create_runtime_ti, mock_s
10611063
# `task_sdk/tests/api/test_client.py::test_task_instance_start` checks the context is received
10621064
# from the API server
10631065
runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
1066+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
10641067

10651068
dr = runtime_ti._ti_context_from_server.dag_run
10661069

@@ -1113,6 +1116,7 @@ def test_lazy_loading_not_triggered_until_accessed(self, create_runtime_ti, mock
11131116
"""Ensure lazy-loaded attributes are not resolved until accessed."""
11141117
task = BaseOperator(task_id="hello")
11151118
runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
1119+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
11161120

11171121
mock_supervisor_comms.get_message.return_value = PrevSuccessfulDagRunResult(
11181122
data_interval_end=timezone.datetime(2025, 1, 1, 2, 0, 0),
@@ -1149,6 +1153,7 @@ def test_get_connection_from_context(self, create_runtime_ti, mock_supervisor_co
11491153
)
11501154

11511155
runtime_ti = create_runtime_ti(task=task, dag_id="test_get_connection_from_context")
1156+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
11521157
mock_supervisor_comms.get_message.return_value = conn
11531158

11541159
context = runtime_ti.get_template_context()
@@ -1185,6 +1190,7 @@ def test_template_render(self, create_runtime_ti):
11851190
task = BaseOperator(task_id="test_template_render_task")
11861191

11871192
runtime_ti = create_runtime_ti(task=task, dag_id="test_template_render")
1193+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
11881194
template_context = runtime_ti.get_template_context()
11891195
result = runtime_ti.task.render_template(
11901196
"Task: {{ dag.dag_id }} -> {{ task.task_id }}", template_context
@@ -1212,6 +1218,7 @@ def test_template_with_connection(
12121218
"""
12131219
task = BaseOperator(task_id="hello")
12141220
runtime_ti = create_runtime_ti(task=task, dag_id="test_template_with_connection")
1221+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
12151222

12161223
conn = ConnectionResult(
12171224
conn_id="a_connection",
@@ -1247,6 +1254,7 @@ def test_get_variable_from_context(
12471254

12481255
task = BaseOperator(task_id="hello")
12491256
runtime_ti = create_runtime_ti(task=task)
1257+
runtime_ti.log_url = runtime_ti.get_log_url_from_ti()
12501258

12511259
var = VariableResult(key="test_key", value=var_value)
12521260

0 commit comments

Comments
 (0)