Skip to content

Commit ff3d500

Browse files
authored
Merge branch 'main' into update-public-interface-docs
2 parents 3af3ead + a537bce commit ff3d500

File tree

3 files changed

+107
-12
lines changed

3 files changed

+107
-12
lines changed

airflow-core/src/airflow/utils/db_cleanup.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class _TableConfig:
7373
in the table. to ignore certain records even if they are the latest in the table, you can
7474
supply additional filters here (e.g. externally triggered dag runs)
7575
:param keep_last_group_by: if keeping the last record, can keep the last record for each group
76+
:param dependent_tables: list of tables which have FK relationship with this table
7677
"""
7778

7879
table_name: str
@@ -81,6 +82,10 @@ class _TableConfig:
8182
keep_last: bool = False
8283
keep_last_filters: Any | None = None
8384
keep_last_group_by: Any | None = None
85+
# We explicitly list these tables instead of detecting foreign keys automatically,
86+
# because the relationships are unlikely to change and the number of tables is small.
87+
# Relying on automation here would increase complexity and reduce maintainability.
88+
dependent_tables: list[str] | None = None
8489

8590
def __post_init__(self):
8691
self.recency_column = column(self.recency_column_name)
@@ -104,29 +109,46 @@ def readable_config(self):
104109

105110
config_list: list[_TableConfig] = [
106111
_TableConfig(table_name="job", recency_column_name="latest_heartbeat"),
107-
_TableConfig(table_name="dag", recency_column_name="last_parsed_time"),
112+
_TableConfig(
113+
table_name="dag",
114+
recency_column_name="last_parsed_time",
115+
dependent_tables=["dag_version", "deadline"],
116+
),
108117
_TableConfig(
109118
table_name="dag_run",
110119
recency_column_name="start_date",
111120
extra_columns=["dag_id", "run_type"],
112121
keep_last=True,
113122
keep_last_filters=[column("run_type") != DagRunType.MANUAL],
114123
keep_last_group_by=["dag_id"],
124+
dependent_tables=["task_instance", "deadline"],
115125
),
116126
_TableConfig(table_name="asset_event", recency_column_name="timestamp"),
117127
_TableConfig(table_name="import_error", recency_column_name="timestamp"),
118128
_TableConfig(table_name="log", recency_column_name="dttm"),
119129
_TableConfig(table_name="sla_miss", recency_column_name="timestamp"),
120-
_TableConfig(table_name="task_instance", recency_column_name="start_date"),
130+
_TableConfig(
131+
table_name="task_instance",
132+
recency_column_name="start_date",
133+
dependent_tables=["task_instance_history", "xcom"],
134+
),
121135
_TableConfig(table_name="task_instance_history", recency_column_name="start_date"),
122136
_TableConfig(table_name="task_reschedule", recency_column_name="start_date"),
123137
_TableConfig(table_name="xcom", recency_column_name="timestamp"),
124138
_TableConfig(table_name="_xcom_archive", recency_column_name="timestamp"),
125139
_TableConfig(table_name="callback_request", recency_column_name="created_at"),
126140
_TableConfig(table_name="celery_taskmeta", recency_column_name="date_done"),
127141
_TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"),
128-
_TableConfig(table_name="trigger", recency_column_name="created_date"),
129-
_TableConfig(table_name="dag_version", recency_column_name="created_at"),
142+
_TableConfig(
143+
table_name="trigger",
144+
recency_column_name="created_date",
145+
dependent_tables=["task_instance"],
146+
),
147+
_TableConfig(
148+
table_name="dag_version",
149+
recency_column_name="created_at",
150+
dependent_tables=["task_instance", "dag_run"],
151+
),
130152
_TableConfig(table_name="deadline", recency_column_name="deadline_time"),
131153
]
132154

@@ -234,6 +256,7 @@ def _do_delete(
234256
logger.debug("delete statement:\n%s", delete.compile())
235257
session.execute(delete)
236258
session.commit()
259+
237260
except BaseException as e:
238261
raise e
239262
finally:
@@ -414,17 +437,37 @@ def _suppress_with_logging(table: str, session: Session):
414437
session.rollback()
415438

416439

417-
def _effective_table_names(*, table_names: list[str] | None) -> tuple[set[str], dict[str, _TableConfig]]:
440+
def _effective_table_names(*, table_names: list[str] | None) -> tuple[list[str], dict[str, _TableConfig]]:
418441
desired_table_names = set(table_names or config_dict)
419-
effective_config_dict = {k: v for k, v in config_dict.items() if k in desired_table_names}
420-
effective_table_names = set(effective_config_dict)
421-
if desired_table_names != effective_table_names:
422-
outliers = desired_table_names - effective_table_names
442+
443+
outliers = desired_table_names - set(config_dict.keys())
444+
if outliers:
423445
logger.warning(
424-
"The following table(s) are not valid choices and will be skipped: %s", sorted(outliers)
446+
"The following table(s) are not valid choices and will be skipped: %s",
447+
sorted(outliers),
425448
)
426-
if not effective_table_names:
449+
desired_table_names = desired_table_names - outliers
450+
451+
visited: set[str] = set()
452+
effective_table_names: list[str] = []
453+
454+
def collect_deps(table: str):
455+
if table in visited:
456+
return
457+
visited.add(table)
458+
config = config_dict[table]
459+
for dep in config.dependent_tables or []:
460+
collect_deps(dep)
461+
effective_table_names.append(table)
462+
463+
for table_name in desired_table_names:
464+
collect_deps(table_name)
465+
466+
effective_config_dict = {n: config_dict[n] for n in effective_table_names}
467+
468+
if not effective_config_dict:
427469
raise SystemExit("No tables selected for db cleanup. Please choose valid table names.")
470+
428471
return effective_table_names, effective_config_dict
429472

430473

@@ -480,6 +523,8 @@ def run_cleanup(
480523
:param session: Session representing connection to the metadata database.
481524
"""
482525
clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
526+
527+
# Get all tables to clean (root + dependents)
483528
effective_table_names, effective_config_dict = _effective_table_names(table_names=table_names)
484529
if dry_run:
485530
print("Performing dry run for db cleanup.")
@@ -491,6 +536,7 @@ def run_cleanup(
491536
if not dry_run and confirm:
492537
_confirm_delete(date=clean_before_timestamp, tables=sorted(effective_table_names))
493538
existing_tables = reflect_tables(tables=None, session=session).tables
539+
494540
for table_name, table_config in effective_config_dict.items():
495541
if table_name in existing_tables:
496542
with _suppress_with_logging(table_name, session):

airflow-core/tests/unit/utils/test_db_cleanup.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import pendulum
2828
import pytest
29-
from sqlalchemy import text
29+
from sqlalchemy import inspect, text
3030
from sqlalchemy.exc import OperationalError, SQLAlchemyError
3131
from sqlalchemy.ext.declarative import DeclarativeMeta
3232

@@ -303,6 +303,51 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, r
303303
else:
304304
raise Exception("unexpected")
305305

306+
@pytest.mark.parametrize(
307+
"table_name, expected_archived",
308+
[
309+
(
310+
"dag_run",
311+
{"dag_run", "task_instance"}, # Only these are populated
312+
),
313+
],
314+
)
315+
def test_run_cleanup_archival_integration(self, table_name, expected_archived):
316+
"""
317+
Integration test that verifies:
318+
1. Recursive FK-dependent tables are resolved via _effective_table_names().
319+
2. run_cleanup() archives only tables with data.
320+
3. Archive tables are not created for empty dependent tables.
321+
"""
322+
base_date = pendulum.datetime(2022, 1, 1, tz="UTC")
323+
num_tis = 5
324+
325+
# Create test data for DAG Run and TIs
326+
if table_name in {"dag_run", "task_instance"}:
327+
create_tis(base_date=base_date, num_tis=num_tis, run_type=DagRunType.MANUAL)
328+
329+
clean_before_date = base_date.add(days=10)
330+
331+
with create_session() as session:
332+
run_cleanup(
333+
clean_before_timestamp=clean_before_date,
334+
table_names=[table_name],
335+
dry_run=False,
336+
confirm=False,
337+
session=session,
338+
)
339+
340+
# Inspect archive tables created
341+
inspector = inspect(session.bind)
342+
archive_tables = {
343+
name for name in inspector.get_table_names() if name.startswith(ARCHIVE_TABLE_PREFIX)
344+
}
345+
actual_archived = {t.split("__", 1)[-1].split("__")[0] for t in archive_tables}
346+
347+
assert expected_archived <= actual_archived, (
348+
f"Expected archive tables not found: {expected_archived - actual_archived}"
349+
)
350+
306351
@pytest.mark.parametrize(
307352
"skip_archive, expected_archives",
308353
[pytest.param(True, 1, id="skip_archive"), pytest.param(False, 2, id="do_archive")],

chart/templates/NOTES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ Flower dashboard:
9898
{{- else }}
9999
You can now access your dashboard(s) by executing the following command(s) and visiting the corresponding port at localhost in your browser:
100100

101+
{{- if semverCompare "<3.0.0" .Values.airflowVersion }}
101102
Airflow Webserver: kubectl port-forward svc/{{ include "airflow.fullname" . }}-webserver {{ .Values.ports.airflowUI }}:{{ .Values.ports.airflowUI }} --namespace {{ .Release.Namespace }}
103+
{{- else }}
104+
Airflow API Server: kubectl port-forward svc/{{ include "airflow.fullname" . }}-api-server {{ .Values.ports.airflowUI }}:{{ .Values.ports.airflowUI }} --namespace {{ .Release.Namespace }}
105+
{{- end }}
102106

103107
{{- if .Values.flower.enabled }}
104108
{{- if or (contains "CeleryExecutor" .Values.executor) (contains "CeleryKubernetesExecutor" .Values.executor)}}

0 commit comments

Comments
 (0)