Skip to content

Commit a20a812

Browse files
[v3-0-test] Apply task group sorting based on webserver config in grid structure response (#49418) (#50138)
* Apply topological sort on task group grid * Fix task_group sorting by utilizing get_task_group_children_getter to sort based on webserver config * Adjust test expectation to sort children topologically (cherry picked from commit d833ecb) Co-authored-by: Jason <[email protected]>
1 parent 378146e commit a20a812

File tree

3 files changed

+53
-54
lines changed

3 files changed

+53
-54
lines changed

airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
from __future__ import annotations
1919

2020
import contextlib
21-
from functools import cache
22-
from operator import methodcaller
23-
from typing import Callable
2421
from uuid import UUID
2522

2623
import structlog
@@ -38,7 +35,6 @@
3835
from airflow.api_fastapi.core_api.datamodels.ui.structure import (
3936
StructureDataResponse,
4037
)
41-
from airflow.configuration import conf
4238
from airflow.models.baseoperator import BaseOperator as DBBaseOperator
4339
from airflow.models.dag_version import DagVersion
4440
from airflow.models.taskmap import TaskMap
@@ -49,20 +45,11 @@
4945
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
5046
from airflow.serialization.serialized_objects import SerializedDAG
5147
from airflow.utils.state import TaskInstanceState
52-
from airflow.utils.task_group import task_group_to_dict
48+
from airflow.utils.task_group import get_task_group_children_getter, task_group_to_dict
5349

5450
log = structlog.get_logger(logger_name=__name__)
5551

5652

57-
@cache
58-
def get_task_group_children_getter() -> Callable:
59-
"""Get the Task Group Children Getter for the DAG."""
60-
sort_order = conf.get("webserver", "grid_view_sorting_order")
61-
if sort_order == "topological":
62-
return methodcaller("topological_sort")
63-
return methodcaller("hierarchical_alphabetical_sort")
64-
65-
6653
def get_task_group_map(dag: DAG) -> dict[str, dict[str, Any]]:
6754
"""
6855
Get the Task Group Map for the DAG.
@@ -262,7 +249,7 @@ def fill_task_instance_summaries(
262249

263250
def get_structure_from_dag(dag: DAG) -> StructureDataResponse:
264251
"""If we do not have TIs, we just get the structure from the DAG."""
265-
nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()]
252+
nodes = [task_group_to_dict(child) for child in get_task_group_children_getter()(dag.task_group)]
266253
return StructureDataResponse(nodes=nodes, edges=[])
267254

268255

@@ -299,7 +286,7 @@ def get_combined_structure(task_instances, session):
299286
if serdag:
300287
dags.append(serdag.dag)
301288
for dag in dags:
302-
nodes = [task_group_to_dict(child) for child in dag.task_group.topological_sort()]
289+
nodes = [task_group_to_dict(child) for child in get_task_group_children_getter()(dag.task_group)]
303290
_merge_node_dicts(merged_nodes, nodes)
304291

305292
return StructureDataResponse(nodes=merged_nodes, edges=[])

airflow-core/src/airflow/utils/task_group.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
from __future__ import annotations
2121

22-
from typing import TYPE_CHECKING
22+
from functools import cache
23+
from operator import methodcaller
24+
from typing import TYPE_CHECKING, Callable
2325

2426
import airflow.sdk.definitions.taskgroup
27+
from airflow.configuration import conf
2528

2629
if TYPE_CHECKING:
2730
from airflow.typing_compat import TypeAlias
@@ -30,6 +33,15 @@
3033
MappedTaskGroup: TypeAlias = airflow.sdk.definitions.taskgroup.MappedTaskGroup
3134

3235

36+
@cache
37+
def get_task_group_children_getter() -> Callable:
38+
"""Get the Task Group Children Getter for the DAG."""
39+
sort_order = conf.get("webserver", "grid_view_sorting_order")
40+
if sort_order == "topological":
41+
return methodcaller("topological_sort")
42+
return methodcaller("hierarchical_alphabetical_sort")
43+
44+
3345
def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False):
3446
"""Create a nested dict representation of this TaskGroup and its children used to construct the Graph."""
3547
from airflow.sdk.bases.operator import BaseOperator
@@ -63,7 +75,7 @@ def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False):
6375
is_mapped = isinstance(task_group, MappedTaskGroup)
6476
children = [
6577
task_group_to_dict(child, parent_group_is_mapped=parent_group_is_mapped or is_mapped)
66-
for child in sorted(task_group.children.values(), key=lambda t: t.label)
78+
for child in get_task_group_children_getter()(task_group)
6779
]
6880

6981
if task_group.upstream_group_ids or task_group.upstream_task_ids:

airflow-core/tests/unit/utils/test_task_group.py

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ def my_task():
6565
"tooltip": "",
6666
},
6767
"children": [
68+
{
69+
"id": "task1",
70+
"value": {
71+
"label": "task1",
72+
"labelStyle": "fill:#000;",
73+
"style": "fill:#e8f7e4;",
74+
"rx": 5,
75+
"ry": 5,
76+
},
77+
},
6878
{
6979
"id": "group234",
7080
"value": {
@@ -78,6 +88,16 @@ def my_task():
7888
"isMapped": False,
7989
},
8090
"children": [
91+
{
92+
"id": "group234.task2",
93+
"value": {
94+
"label": "task2",
95+
"labelStyle": "fill:#000;",
96+
"style": "fill:#e8f7e4;",
97+
"rx": 5,
98+
"ry": 5,
99+
},
100+
},
81101
{
82102
"id": "group234.group34",
83103
"value": {
@@ -122,16 +142,6 @@ def my_task():
122142
},
123143
],
124144
},
125-
{
126-
"id": "group234.task2",
127-
"value": {
128-
"label": "task2",
129-
"labelStyle": "fill:#000;",
130-
"style": "fill:#e8f7e4;",
131-
"rx": 5,
132-
"ry": 5,
133-
},
134-
},
135145
{
136146
"id": "group234.upstream_join_id",
137147
"value": {
@@ -143,16 +153,6 @@ def my_task():
143153
},
144154
],
145155
},
146-
{
147-
"id": "task1",
148-
"value": {
149-
"label": "task1",
150-
"labelStyle": "fill:#000;",
151-
"style": "fill:#e8f7e4;",
152-
"rx": 5,
153-
"ry": 5,
154-
},
155-
},
156156
{
157157
"id": "task5",
158158
"value": {
@@ -172,12 +172,14 @@ def my_task():
172172
"tooltip": "",
173173
"is_mapped": False,
174174
"children": [
175+
{"id": "task1", "label": "task1", "operator": "EmptyOperator", "type": "task"},
175176
{
176177
"id": "group234",
177178
"label": "group234",
178179
"tooltip": "",
179180
"is_mapped": False,
180181
"children": [
182+
{"id": "group234.task2", "label": "task2", "operator": "EmptyOperator", "type": "task"},
181183
{
182184
"id": "group234.group34",
183185
"label": "group34",
@@ -200,12 +202,10 @@ def my_task():
200202
],
201203
"type": "task",
202204
},
203-
{"id": "group234.task2", "label": "task2", "operator": "EmptyOperator", "type": "task"},
204205
{"id": "group234.upstream_join_id", "label": "", "type": "join"},
205206
],
206207
"type": "task",
207208
},
208-
{"id": "task1", "label": "task1", "operator": "EmptyOperator", "type": "task"},
209209
{"id": "task5", "label": "task5", "operator": "EmptyOperator", "type": "task"},
210210
],
211211
"type": "task",
@@ -314,28 +314,28 @@ def test_build_task_group_with_prefix():
314314
"id": None,
315315
"label": None,
316316
"children": [
317+
{"id": "task1", "label": "task1"},
317318
{
318319
"id": "group234",
319320
"label": "group234",
320321
"children": [
322+
{"id": "task2", "label": "task2"},
321323
{
322324
"id": "group34",
323325
"label": "group34",
324326
"children": [
327+
{"id": "group34.task3", "label": "task3"},
325328
{
326329
"id": "group34.group4",
327330
"label": "group4",
328331
"children": [{"id": "task4", "label": "task4"}],
329332
},
330-
{"id": "group34.task3", "label": "task3"},
331333
{"id": "group34.downstream_join_id", "label": ""},
332334
],
333335
},
334-
{"id": "task2", "label": "task2"},
335336
{"id": "group234.upstream_join_id", "label": ""},
336337
],
337338
},
338-
{"id": "task1", "label": "task1"},
339339
{"id": "task5", "label": "task5"},
340340
],
341341
}
@@ -389,6 +389,7 @@ def task_5():
389389
expected_node_id = {
390390
"id": None,
391391
"children": [
392+
{"id": "task_1"},
392393
{
393394
"id": "group234",
394395
"children": [
@@ -399,7 +400,6 @@ def task_5():
399400
{"id": "group234.downstream_join_id"},
400401
],
401402
},
402-
{"id": "task_1"},
403403
{"id": "task_5"},
404404
],
405405
}
@@ -448,6 +448,7 @@ def test_sub_dag_task_group():
448448
expected_node_id = {
449449
"id": None,
450450
"children": [
451+
{"id": "task1"},
451452
{
452453
"id": "group234",
453454
"children": [
@@ -462,7 +463,6 @@ def test_sub_dag_task_group():
462463
{"id": "group234.upstream_join_id"},
463464
],
464465
},
465-
{"id": "task1"},
466466
{"id": "task5"},
467467
],
468468
}
@@ -540,6 +540,7 @@ def test_dag_edges():
540540
expected_node_id = {
541541
"id": None,
542542
"children": [
543+
{"id": "task1"},
543544
{
544545
"id": "group_a",
545546
"children": [
@@ -567,6 +568,8 @@ def test_dag_edges():
567568
{"id": "group_c.downstream_join_id"},
568569
],
569570
},
571+
{"id": "task9"},
572+
{"id": "task10"},
570573
{
571574
"id": "group_d",
572575
"children": [
@@ -575,9 +578,6 @@ def test_dag_edges():
575578
{"id": "group_d.upstream_join_id"},
576579
],
577580
},
578-
{"id": "task1"},
579-
{"id": "task10"},
580-
{"id": "task9"},
581581
],
582582
}
583583

@@ -818,22 +818,22 @@ def section_2(value2):
818818
node_ids = {
819819
"id": None,
820820
"children": [
821+
{"id": "task_start"},
821822
{
822823
"id": "section_1",
823824
"children": [
825+
{"id": "section_1.task_1"},
826+
{"id": "section_1.task_2"},
824827
{
825828
"id": "section_1.section_2",
826829
"children": [
827830
{"id": "section_1.section_2.task_3"},
828831
{"id": "section_1.section_2.task_4"},
829832
],
830833
},
831-
{"id": "section_1.task_1"},
832-
{"id": "section_1.task_2"},
833834
],
834835
},
835836
{"id": "task_end"},
836-
{"id": "task_start"},
837837
],
838838
}
839839

@@ -992,6 +992,7 @@ def section_2(value):
992992
node_ids = {
993993
"id": None,
994994
"children": [
995+
{"id": "task_start"},
995996
{
996997
"id": "section_1",
997998
"children": [
@@ -1011,7 +1012,6 @@ def section_2(value):
10111012
],
10121013
},
10131014
{"id": "task_end"},
1014-
{"id": "task_start"},
10151015
],
10161016
}
10171017

@@ -1153,17 +1153,17 @@ def task_group1(name: str):
11531153
{
11541154
"id": "task_group1",
11551155
"children": [
1156-
{"id": "task_group1.end_task"},
11571156
{"id": "task_group1.start_task"},
11581157
{"id": "task_group1.task"},
1158+
{"id": "task_group1.end_task"},
11591159
],
11601160
},
11611161
{
11621162
"id": "task_group1__1",
11631163
"children": [
1164-
{"id": "task_group1__1.end_task"},
11651164
{"id": "task_group1__1.start_task"},
11661165
{"id": "task_group1__1.task"},
1166+
{"id": "task_group1__1.end_task"},
11671167
],
11681168
},
11691169
],

0 commit comments

Comments
 (0)