Skip to content

Commit 4d6de80

Browse files
committed
Fall back when a task cannot be expanded
When rendering a dag in the API, we must take account for cases when a mapped task cannot be expanded, either due to the upstream has not been resolved, or cannot provide a value due to failure. When this happens, we simply count 1 ti for the unexpanded task. This is how the scheduler represent the task internally, and should be a good enough value for UI representation.
1 parent 59e7606 commit 4d6de80

File tree

3 files changed

+79
-16
lines changed

3 files changed

+79
-16
lines changed

airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import contextlib
2021
from functools import cache
2122
from operator import methodcaller
2223
from typing import Callable
@@ -42,6 +43,8 @@
4243
from airflow.models.dag_version import DagVersion
4344
from airflow.models.taskmap import TaskMap
4445
from airflow.sdk import BaseOperator
46+
from airflow.sdk.definitions._internal.abstractoperator import NotMapped
47+
from airflow.sdk.definitions._internal.expandinput import NotFullyPopulated
4548
from airflow.sdk.definitions.mappedoperator import MappedOperator
4649
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
4750
from airflow.serialization.serialized_objects import SerializedDAG
@@ -140,19 +143,14 @@ def get_child_task_map(parent_task_id: str, task_node_map: dict[str, dict[str, A
140143
return [task_id for task_id, task_map in task_node_map.items() if task_map["parent_id"] == parent_task_id]
141144

142145

143-
def _get_total_task_count(
144-
run_id: str, task_count: list[int | MappedTaskGroup | MappedOperator], session: SessionDep
145-
) -> int:
146-
return sum(
147-
node
148-
if isinstance(node, int)
149-
else (
150-
DBBaseOperator.get_mapped_ti_count(node, run_id=run_id, session=session) or 0
151-
if isinstance(node, (MappedTaskGroup, MappedOperator))
152-
else node
153-
)
154-
for node in task_count
155-
)
146+
def _count_tis(node: int | MappedTaskGroup | MappedOperator, run_id: str, session: SessionDep) -> int:
147+
if not isinstance(node, (MappedTaskGroup, MappedOperator)):
148+
return node
149+
with contextlib.suppress(NotFullyPopulated, NotMapped):
150+
return DBBaseOperator.get_mapped_ti_count(node, run_id=run_id, session=session)
151+
# If the downstream is not actually mapped, or we don't have information to
152+
# determine the length yet, simply return 1 to represent the stand-in ti.
153+
return 1
156154

157155

158156
def fill_task_instance_summaries(
@@ -253,7 +251,7 @@ def fill_task_instance_summaries(
253251
end_date=ti_end_date,
254252
queued_dttm=ti_queued_dttm,
255253
child_states=child_states,
256-
task_count=_get_total_task_count(run_id, task_node_map[task_id]["task_count"], session),
254+
task_count=sum(_count_tis(n, run_id, session) for n in task_node_map[task_id]["task_count"]),
257255
state=TaskInstanceState[overall_ti_state.upper()]
258256
if overall_ti_state != "no_status"
259257
else None,

airflow-core/src/airflow/models/baseoperator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ def get_mapped_ti_count(cls, task: DAGNode, run_id: str, *, session: Session) ->
621621
raise NotImplementedError(f"Not implemented for {type(task)}")
622622

623623
# https://github.com/python/cpython/issues/86153
624-
# WHile we support Python 3.9 we can't rely on the type hint, we need to pass the type explicitly to
624+
# While we support Python 3.9 we can't rely on the type hint, we need to pass the type explicitly to
625625
# register.
626626
@get_mapped_ti_count.register(TaskSDKAbstractOperator)
627627
@classmethod

airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
TASK_ID_4 = "task4"
4949
SUB_TASK_ID = "subtask"
5050
MAPPED_TASK_ID = "mapped_task"
51+
MAPPED_TASK_ID_2 = "mapped_task_2"
5152
TASK_GROUP_ID = "task_group"
5253
INNER_TASK_GROUP = "inner_task_group"
5354
INNER_TASK_GROUP_SUB_TASK = "inner_task_group_sub_task"
@@ -140,6 +141,31 @@
140141
"task_id": "task_group",
141142
"try_number": 0,
142143
},
144+
{
145+
"child_states": {
146+
"deferred": 0,
147+
"failed": 0,
148+
"no_status": 0,
149+
"queued": 0,
150+
"removed": 0,
151+
"restarting": 0,
152+
"running": 0,
153+
"scheduled": 0,
154+
"skipped": 0,
155+
"success": 1,
156+
"up_for_reschedule": 0,
157+
"up_for_retry": 0,
158+
"upstream_failed": 0,
159+
},
160+
"end_date": None,
161+
"note": None,
162+
"queued_dttm": None,
163+
"start_date": None,
164+
"state": "success",
165+
"task_count": 1,
166+
"task_id": "mapped_task_2",
167+
"try_number": 0,
168+
},
143169
{
144170
"child_states": {
145171
"deferred": 0,
@@ -331,6 +357,31 @@
331357
"task_id": "task_group",
332358
"try_number": 0,
333359
},
360+
{
361+
"child_states": {
362+
"deferred": 0,
363+
"failed": 0,
364+
"no_status": 1,
365+
"queued": 0,
366+
"removed": 0,
367+
"restarting": 0,
368+
"running": 0,
369+
"scheduled": 0,
370+
"skipped": 0,
371+
"success": 0,
372+
"up_for_reschedule": 0,
373+
"up_for_retry": 0,
374+
"upstream_failed": 0,
375+
},
376+
"end_date": None,
377+
"note": None,
378+
"queued_dttm": None,
379+
"start_date": None,
380+
"state": None,
381+
"task_count": 1,
382+
"task_id": "mapped_task_2",
383+
"try_number": 0,
384+
},
334385
{
335386
"child_states": {
336387
"deferred": 0,
@@ -517,6 +568,17 @@
517568
"tooltip": "",
518569
"type": "task",
519570
},
571+
{
572+
"asset_condition_type": None,
573+
"children": None,
574+
"id": "mapped_task_2",
575+
"is_mapped": True,
576+
"label": "mapped_task_2",
577+
"operator": "MockOperator",
578+
"setup_teardown_type": None,
579+
"tooltip": None,
580+
"type": "task",
581+
},
520582
],
521583
}
522584

@@ -536,7 +598,7 @@ def setup(dag_maker, session=None):
536598

537599
# DAG 1
538600
with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
539-
EmptyOperator(task_id=TASK_ID)
601+
task = EmptyOperator(task_id=TASK_ID)
540602

541603
@task_group
542604
def mapped_task_group(arg1):
@@ -549,6 +611,9 @@ def mapped_task_group(arg1):
549611
with TaskGroup(group_id=INNER_TASK_GROUP):
550612
MockOperator.partial(task_id=INNER_TASK_GROUP_SUB_TASK).expand(arg1=["a", "b"])
551613

614+
# Mapped but never expanded. API should not crash, but count this as one no-status ti.
615+
MockOperator.partial(task_id=MAPPED_TASK_ID_2).expand(arg1=task.output)
616+
552617
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
553618
logical_date = timezone.datetime(2024, 11, 30)
554619
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)

0 commit comments

Comments
 (0)