Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ class TaskInstanceInfo(InfoJsonEncodable):

includes = ["duration", "try_number", "pool", "queued_dttm", "log_url"]
casts = {
"log_url": lambda ti: getattr(ti, "log_url", None),
"map_index": lambda ti: ti.map_index if getattr(ti, "map_index", -1) != -1 else None,
"dag_bundle_version": lambda ti: (
ti.bundle_instance.version if hasattr(ti, "bundle_instance") else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,7 @@ def test_taskinstance_info_af3():
runtime_ti.bundle_instance = bundle_instance

assert dict(TaskInstanceInfo(runtime_ti)) == {
"log_url": None,
"map_index": 2,
"try_number": 1,
"dag_bundle_version": "bundle_version",
Expand Down
20 changes: 20 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class RuntimeTaskInstance(TaskInstance):

rendered_map_index: str | None = None

log_url: str | None = None

def __rich_repr__(self):
yield "id", self.id
yield "task_id", self.task_id
Expand Down Expand Up @@ -546,6 +548,23 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None:
)


def get_log_url_from_ti(ti: RuntimeTaskInstance) -> str:
from urllib.parse import quote

from airflow.configuration import conf

run_id = quote(ti.run_id)
base_url = conf.get("api", "base_url", fallback="http://localhost:8080/")
map_index_value = getattr(ti, "map_index", -1)
map_index = f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else ""
try_number_value = getattr(ti, "try_number", 0)
try_number = (
f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else ""
)
_log_uri = f"{base_url}dags/{ti.dag_id}/runs/{run_id}/tasks/{ti.task_id}{map_index}{try_number}"
return _log_uri


def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
# Using DagBag here is about 98% wrong, but it'll do for now
Expand Down Expand Up @@ -703,6 +722,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:

with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id):
ti = parse(msg, log)
ti.log_url = get_log_url_from_ti(ti)
log.debug("DAG file parsed", file=msg.dag_rel_path)
else:
raise RuntimeError(f"Unhandled startup message {type(msg)} {msg}")
Expand Down
3 changes: 3 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
_push_xcom_if_needed,
_xcom_push,
finalize,
get_log_url_from_ti,
parse,
run,
startup,
Expand Down Expand Up @@ -2144,6 +2145,8 @@ def execute(self, context):
mocked_parse(what, "basic_dag", task)

runtime_ti, context, log = startup()
assert runtime_ti is not None
assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti)
assert isinstance(listener.component, TaskRunnerMarker)
del listener.component

Expand Down