Skip to content

Commit b01e534

Browse files
authored
Remove some lingering subdag references (#49663)
1 parent a497e7c commit b01e534

File tree

6 files changed

+24
-44
lines changed

6 files changed

+24
-44
lines changed

airflow-core/src/airflow/models/dag.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,7 @@ def set_task_instance_state(
12511251
# Clear downstream tasks that are in failed/upstream_failed state to resume them.
12521252
# Flush the session so that the tasks marked success are reflected in the db.
12531253
session.flush()
1254-
subdag = self.partial_subset(
1254+
subset = self.partial_subset(
12551255
task_ids={task_id},
12561256
include_downstream=True,
12571257
include_upstream=False,
@@ -1273,9 +1273,9 @@ def set_task_instance_state(
12731273
}
12741274
if not future and not past: # Simple case 1: we're only dealing with exactly one run.
12751275
clear_kwargs["run_id"] = run_id
1276-
subdag.clear(**clear_kwargs)
1276+
subset.clear(**clear_kwargs)
12771277
elif future and past: # Simple case 2: we're clearing ALL runs.
1278-
subdag.clear(**clear_kwargs)
1278+
subset.clear(**clear_kwargs)
12791279
else: # Complex cases: we may have more than one run, based on a date range.
12801280
# Make 'future' and 'past' make some sense when multiple runs exist
12811281
# for the same logical date. We order runs by their id and only
@@ -1287,7 +1287,7 @@ def set_task_instance_state(
12871287
else:
12881288
clear_kwargs["end_date"] = logical_date
12891289
exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id)
1290-
subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs)
1290+
subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs)
12911291
return altered
12921292

12931293
@provide_session
@@ -1363,13 +1363,13 @@ def get_logical_date() -> datetime:
13631363
# Clear downstream tasks that are in failed/upstream_failed state to resume them.
13641364
# Flush the session so that the tasks marked success are reflected in the db.
13651365
session.flush()
1366-
task_subset = self.partial_subset(
1366+
subset = self.partial_subset(
13671367
task_ids=task_ids,
13681368
include_downstream=True,
13691369
include_upstream=False,
13701370
)
13711371

1372-
task_subset.clear(
1372+
subset.clear(
13731373
start_date=start_date,
13741374
end_date=end_date,
13751375
only_failed=True,

airflow-core/src/airflow/models/dagbag.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,8 @@ def bag_dag(self, dag: DAG):
505505
"""
506506
Add the DAG into the bag.
507507
508-
:raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags.
509-
:raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
508+
:raises: AirflowDagCycleException if a cycle is detected.
509+
:raises: AirflowDagDuplicatedIdException if this dag already exists in the bag.
510510
"""
511511
check_cycle(dag) # throws if a task cycle is found
512512

airflow-core/src/airflow/security/permissions.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,7 @@ class ResourceDetails(TypedDict):
9595

9696

9797
def resource_name(root_dag_id: str, resource: str) -> str:
98-
"""
99-
Return the resource name for a DAG id.
100-
101-
Note that since a sub-DAG should follow the permission of its
102-
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
103-
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
104-
"""
98+
"""Return the resource name for a DAG id."""
10599
if root_dag_id in RESOURCE_DETAILS_MAP.keys():
106100
return root_dag_id
107101
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
@@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
113107
"""
114108
Return the resource name for a DAG id.
115109
116-
Note that since a sub-DAG should follow the permission of its
117-
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
118-
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
119-
120110
Note: This function is kept for backwards compatibility.
121111
"""
122112
if root_dag_id == RESOURCE_DAG:

airflow-core/tests/unit/utils/test_task_group.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ def test_sub_dag_task_group():
443443
group234 >> group6
444444
group234 >> task7
445445

446-
subdag = dag.partial_subset(task_ids="task5", include_upstream=True, include_downstream=False)
446+
subset = dag.partial_subset(task_ids="task5", include_upstream=True, include_downstream=False)
447447

448448
expected_node_id = {
449449
"id": None,
@@ -467,9 +467,9 @@ def test_sub_dag_task_group():
467467
],
468468
}
469469

470-
assert extract_node_id(task_group_to_dict(subdag.task_group)) == expected_node_id
470+
assert extract_node_id(task_group_to_dict(subset.task_group)) == expected_node_id
471471

472-
edges = dag_edges(subdag)
472+
edges = dag_edges(subset)
473473
assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
474474
("group234.group34.downstream_join_id", "task5"),
475475
("group234.group34.task3", "group234.group34.downstream_join_id"),
@@ -479,19 +479,19 @@ def test_sub_dag_task_group():
479479
("task1", "group234.upstream_join_id"),
480480
]
481481

482-
subdag_task_groups = subdag.task_group.get_task_group_dict()
483-
assert subdag_task_groups.keys() == {None, "group234", "group234.group34"}
482+
groups = subset.task_group.get_task_group_dict()
483+
assert groups.keys() == {None, "group234", "group234.group34"}
484484

485485
included_group_ids = {"group234", "group234.group34"}
486486
included_task_ids = {"group234.group34.task3", "group234.group34.task4", "task1", "task5"}
487487

488-
for task_group in subdag_task_groups.values():
488+
for task_group in groups.values():
489489
assert task_group.upstream_group_ids.issubset(included_group_ids)
490490
assert task_group.downstream_group_ids.issubset(included_group_ids)
491491
assert task_group.upstream_task_ids.issubset(included_task_ids)
492492
assert task_group.downstream_task_ids.issubset(included_task_ids)
493493

494-
for task in subdag.task_group:
494+
for task in subset.task_group:
495495
assert task.upstream_task_ids.issubset(included_task_ids)
496496
assert task.downstream_task_ids.issubset(included_task_ids)
497497

providers/fab/src/airflow/providers/fab/www/security/permissions.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,7 @@ class ResourceDetails(TypedDict):
9595

9696

9797
def resource_name(root_dag_id: str, resource: str) -> str:
98-
"""
99-
Return the resource name for a DAG id.
100-
101-
Note that since a sub-DAG should follow the permission of its
102-
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
103-
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
104-
"""
98+
"""Return the resource name for a DAG id."""
10599
if root_dag_id in RESOURCE_DETAILS_MAP.keys():
106100
return root_dag_id
107101
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
@@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
113107
"""
114108
Return the resource name for a DAG id.
115109
116-
Note that since a sub-DAG should follow the permission of its
117-
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
118-
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
119-
120110
Note: This function is kept for backwards compatibility.
121111
"""
122112
if root_dag_id == RESOURCE_DAG:

task-sdk/src/airflow/sdk/definitions/dag.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ def _deepcopy_task(t) -> Operator:
831831
}
832832

833833
def filter_task_group(group, parent_group):
834-
"""Exclude tasks not included in the subdag from the given TaskGroup."""
834+
"""Exclude tasks not included in the partial dag from the given TaskGroup."""
835835
# We want to deepcopy _most but not all_ attributes of the task group, so we create a shallow copy
836836
# and then manually deep copy the instances. (memo argument to deepcopy only works for instances
837837
# of classes, not "native" properties of an instance)
@@ -867,12 +867,12 @@ def filter_task_group(group, parent_group):
867867

868868
# Removing upstream/downstream references to tasks and TaskGroups that did not make
869869
# the cut.
870-
subdag_task_groups = dag.task_group.get_task_group_dict()
871-
for group in subdag_task_groups.values():
872-
group.upstream_group_ids.intersection_update(subdag_task_groups)
873-
group.downstream_group_ids.intersection_update(subdag_task_groups)
874-
group.upstream_task_ids.intersection_update(dag.task_dict)
875-
group.downstream_task_ids.intersection_update(dag.task_dict)
870+
groups = dag.task_group.get_task_group_dict()
871+
for g in groups.values():
872+
g.upstream_group_ids.intersection_update(groups)
873+
g.downstream_group_ids.intersection_update(groups)
874+
g.upstream_task_ids.intersection_update(dag.task_dict)
875+
g.downstream_task_ids.intersection_update(dag.task_dict)
876876

877877
for t in dag.tasks:
878878
# Removing upstream/downstream references to tasks that did not

0 commit comments

Comments
 (0)