Skip to content

Commit 9d047ff

Browse files
dstandishjose-lehmkuhl
authored andcommitted
Fix get dags query to not have join explosion (apache#50984)
Previously it was missing dag_id filter, but joining on start date would still be problematic. In this PR I refactor the query a bit so that all joins are guaranteed 1-1. To get "latest" DagRun I sort by the DagRun.id column. This is a simplifying assumption that would be more performant than sorting by start_date, since there could be more than 1 dag run with a given start date.
1 parent 7d1fcfd commit 9d047ff

File tree

2 files changed

+38
-13
lines changed
  • airflow-core
    • src/airflow/api_fastapi/core_api/routes/public
    • tests/unit/api_fastapi/core_api/routes/public

2 files changed

+38
-13
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
from fastapi import Depends, HTTPException, Query, Response, status
2323
from fastapi.exceptions import RequestValidationError
2424
from pydantic import ValidationError
25-
from sqlalchemy import select, update
25+
from sqlalchemy import func, null, select, update
2626

2727
from airflow.api.common import delete_dag as delete_dag_module
2828
from airflow.api_fastapi.common.dagbag import DagBagDep
2929
from airflow.api_fastapi.common.db.common import (
3030
SessionDep,
31+
apply_filters_to_select,
3132
paginated_select,
3233
)
3334
from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query
@@ -115,30 +116,54 @@ def get_dags(
115116
session: SessionDep,
116117
) -> DAGCollectionResponse:
117118
"""Get all DAGs."""
118-
dag_runs_select = None
119+
query = select(DagModel)
119120

120-
if dag_run_state.value or dag_run_start_date_range.is_active() or dag_run_end_date_range.is_active():
121-
dag_runs_select, _ = paginated_select(
122-
statement=select(DagRun),
121+
max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption
122+
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
123+
.where(DagRun.start_date.is_not(null()))
124+
.group_by(DagRun.dag_id)
125+
.subquery(name="mrq")
126+
)
127+
128+
has_max_run_filter = (
129+
dag_run_state.value
130+
or last_dag_run_state.value
131+
or dag_run_start_date_range.is_active()
132+
or dag_run_end_date_range.is_active()
133+
)
134+
135+
if has_max_run_filter or order_by.value in (
136+
"last_run_state",
137+
"last_run_start_date",
138+
"-last_run_state",
139+
"-last_run_start_date",
140+
):
141+
query = query.join(
142+
max_run_id_query,
143+
DagModel.dag_id == max_run_id_query.c.dag_id,
144+
isouter=True,
145+
).join(DagRun, DagRun.id == max_run_id_query.c.max_dag_run_id, isouter=True)
146+
147+
if has_max_run_filter:
148+
query = apply_filters_to_select(
149+
statement=query,
123150
filters=[
124151
dag_run_start_date_range,
125152
dag_run_end_date_range,
126153
dag_run_state,
154+
last_dag_run_state,
127155
],
128-
session=session,
129156
)
130-
dag_runs_select = dag_runs_select.cte()
131157

132158
dags_select, total_entries = paginated_select(
133-
statement=generate_dag_with_latest_run_query(dag_runs_select),
159+
statement=query,
134160
filters=[
135161
exclude_stale,
136162
paused,
137163
dag_id_pattern,
138164
dag_display_name_pattern,
139165
tags,
140166
owners,
141-
last_dag_run_state,
142167
readable_dags_filter,
143168
],
144169
order_by=order_by,

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class TestGetDags(TestDagEndpoint):
172172
({"last_dag_run_state": "success", "exclude_stale": False}, 1, [DAG3_ID]),
173173
({"last_dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]),
174174
({"dag_run_state": "failed"}, 1, [DAG1_ID]),
175-
({"dag_run_state": "failed", "exclude_stale": False}, 2, [DAG1_ID, DAG3_ID]),
175+
({"dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]),
176176
(
177177
{"dag_run_start_date_gte": DAG3_START_DATE_2.isoformat(), "exclude_stale": False},
178178
1,
@@ -220,10 +220,10 @@ class TestGetDags(TestDagEndpoint):
220220
"dag_run_state": "failed",
221221
"exclude_stale": False,
222222
},
223-
1,
224-
[DAG3_ID],
223+
0,
224+
[],
225225
),
226-
# # Sort
226+
# Sort
227227
({"order_by": "-dag_id"}, 2, [DAG2_ID, DAG1_ID]),
228228
({"order_by": "-dag_display_name"}, 2, [DAG2_ID, DAG1_ID]),
229229
({"order_by": "dag_display_name"}, 2, [DAG1_ID, DAG2_ID]),

0 commit comments

Comments
 (0)