Skip to content

Commit a992b62

Browse files
Merge branch 'main' into issue-50000
2 parents a06c524 + b994bb2 commit a992b62

File tree

22 files changed

+477
-173
lines changed

22 files changed

+477
-173
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
# Translations
3939
airflow-core/src/airflow/ui/src/i18n/locales/de/ @jscheffl
40-
airflow-core/src/airflow/ui/src/i18n/locales/zh_TW/ @Lee-W
40+
airflow-core/src/airflow/ui/src/i18n/locales/zh-TW/ @Lee-W
4141
airflow-core/src/airflow/ui/src/i18n/locales/nl/ @BasPH # not codeowner but engaged: @DjVinnii
4242

4343
# Security/Permissions

ISSUE_TRIAGE_PROCESS.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ associated with them such as ``provider:amazon-aws``, ``provider:microsoft-azure
196196
These make it easier for developers working on a single provider to
197197
track issues for that provider.
198198

199-
Note: each provider has it's own unique label. It is possible for issue to be tagged with more than 1 provider label.
199+
Note: each provider has its own unique label. It is possible for issue to be tagged with more than 1 provider label.
200200

201201
Most issues need a combination of "kind" and "area" labels to be actionable.
202202
For example:

airflow-core/docs/tutorial/fundamentals.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ each line in detail.
4343
Understanding the DAG Definition File
4444
-------------------------------------
4545
Think of the Airflow Python script as a configuration file that lays out the structure of your DAG in code. The actual
46-
tasks you define here run in a different environment, which means this script isn't meant for data processing. It's main
46+
tasks you define here run in a different environment, which means this script isn't meant for data processing. Its main
4747
job is to define the DAG object, and it needs to evaluate quickly since the DAG File Processor checks it regularly for
4848
any changes.
4949

airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
from fastapi import Depends, HTTPException, Query, Response, status
2323
from fastapi.exceptions import RequestValidationError
2424
from pydantic import ValidationError
25-
from sqlalchemy import select, update
25+
from sqlalchemy import func, null, select, update
2626

2727
from airflow.api.common import delete_dag as delete_dag_module
2828
from airflow.api_fastapi.common.dagbag import DagBagDep
2929
from airflow.api_fastapi.common.db.common import (
3030
SessionDep,
31+
apply_filters_to_select,
3132
paginated_select,
3233
)
3334
from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query
@@ -115,30 +116,54 @@ def get_dags(
115116
session: SessionDep,
116117
) -> DAGCollectionResponse:
117118
"""Get all DAGs."""
118-
dag_runs_select = None
119+
query = select(DagModel)
119120

120-
if dag_run_state.value or dag_run_start_date_range.is_active() or dag_run_end_date_range.is_active():
121-
dag_runs_select, _ = paginated_select(
122-
statement=select(DagRun),
121+
max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption
122+
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
123+
.where(DagRun.start_date.is_not(null()))
124+
.group_by(DagRun.dag_id)
125+
.subquery(name="mrq")
126+
)
127+
128+
has_max_run_filter = (
129+
dag_run_state.value
130+
or last_dag_run_state.value
131+
or dag_run_start_date_range.is_active()
132+
or dag_run_end_date_range.is_active()
133+
)
134+
135+
if has_max_run_filter or order_by.value in (
136+
"last_run_state",
137+
"last_run_start_date",
138+
"-last_run_state",
139+
"-last_run_start_date",
140+
):
141+
query = query.join(
142+
max_run_id_query,
143+
DagModel.dag_id == max_run_id_query.c.dag_id,
144+
isouter=True,
145+
).join(DagRun, DagRun.id == max_run_id_query.c.max_dag_run_id, isouter=True)
146+
147+
if has_max_run_filter:
148+
query = apply_filters_to_select(
149+
statement=query,
123150
filters=[
124151
dag_run_start_date_range,
125152
dag_run_end_date_range,
126153
dag_run_state,
154+
last_dag_run_state,
127155
],
128-
session=session,
129156
)
130-
dag_runs_select = dag_runs_select.cte()
131157

132158
dags_select, total_entries = paginated_select(
133-
statement=generate_dag_with_latest_run_query(dag_runs_select),
159+
statement=query,
134160
filters=[
135161
exclude_stale,
136162
paused,
137163
dag_id_pattern,
138164
dag_display_name_pattern,
139165
tags,
140166
owners,
141-
last_dag_run_state,
142167
readable_dags_filter,
143168
],
144169
order_by=order_by,

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class TestGetDags(TestDagEndpoint):
172172
({"last_dag_run_state": "success", "exclude_stale": False}, 1, [DAG3_ID]),
173173
({"last_dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]),
174174
({"dag_run_state": "failed"}, 1, [DAG1_ID]),
175-
({"dag_run_state": "failed", "exclude_stale": False}, 2, [DAG1_ID, DAG3_ID]),
175+
({"dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]),
176176
(
177177
{"dag_run_start_date_gte": DAG3_START_DATE_2.isoformat(), "exclude_stale": False},
178178
1,
@@ -220,10 +220,10 @@ class TestGetDags(TestDagEndpoint):
220220
"dag_run_state": "failed",
221221
"exclude_stale": False,
222222
},
223-
1,
224-
[DAG3_ID],
223+
0,
224+
[],
225225
),
226-
# # Sort
226+
# Sort
227227
({"order_by": "-dag_id"}, 2, [DAG2_ID, DAG1_ID]),
228228
({"order_by": "-dag_display_name"}, 2, [DAG2_ID, DAG1_ID]),
229229
({"order_by": "dag_display_name"}, 2, [DAG1_ID, DAG2_ID]),

airflow-ctl/docs/images/command_hashes.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ dagrun:7b3e06a3664cc7ceb18457b4c0895532
99
jobs:806174e6c9511db669705279ed6a00b9
1010
pools:2c17a4131b6481bd8fe9120982606db2
1111
providers:d053e6f17ff271e1e08942378344d27b
12-
variables:d9001295d77adefbd68e389f6622b89a
12+
variables:cd3970589b2cb1e3ebd9a0b7f2ffdf4d
1313
version:11da98f530c37754403a87151cbe2274
1414
auth login:348c25d49128b6007ac97dae2ef7563f

airflow-ctl/docs/images/output_variables.svg

Lines changed: 44 additions & 36 deletions
Loading

airflow-ctl/src/airflowctl/ctl/cli_config.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,23 @@ def __call__(self, parser, namespace, values, option_string=None):
198198
action=Password,
199199
nargs="?",
200200
)
201+
ARG_VARIABLE_IMPORT = Arg(
202+
flags=("file",),
203+
metavar="file",
204+
help="Import variables from JSON file",
205+
)
206+
ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg(
207+
flags=("-a", "--action-on-existing-key"),
208+
type=str,
209+
default="overwrite",
210+
help="Action to take if we encounter a variable key that already exists.",
211+
choices=("overwrite", "fail", "skip"),
212+
)
213+
ARG_VARIABLE_EXPORT = Arg(
214+
flags=("file",),
215+
metavar="file",
216+
help="Export all variables to JSON file",
217+
)
201218

202219
ARG_OUTPUT = Arg(
203220
flags=("-o", "--output"),
@@ -626,6 +643,21 @@ def merge_commands(
626643
),
627644
)
628645

646+
VARIABLE_COMMANDS = (
647+
ActionCommand(
648+
name="import",
649+
help="Import variables",
650+
func=lazy_load_command("airflowctl.ctl.commands.variable_command.import_"),
651+
args=(ARG_VARIABLE_IMPORT, ARG_VARIABLE_ACTION_ON_EXISTING_KEY),
652+
),
653+
ActionCommand(
654+
name="export",
655+
help="Export all variables",
656+
func=lazy_load_command("airflowctl.ctl.commands.variable_command.export"),
657+
args=(ARG_VARIABLE_EXPORT,),
658+
),
659+
)
660+
629661
core_commands: list[CLICommand] = [
630662
GroupCommand(
631663
name="auth",
@@ -638,6 +670,11 @@ def merge_commands(
638670
help="Manage Airflow pools",
639671
subcommands=POOL_COMMANDS,
640672
),
673+
GroupCommand(
674+
name="variables",
675+
help="Manage Airflow variables",
676+
subcommands=VARIABLE_COMMANDS,
677+
),
641678
]
642679
# Add generated group commands
643680
core_commands = merge_commands(

0 commit comments

Comments
 (0)