Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4c2502b
enhance cleanup logic to archive dependent tables first
vatsrahul1001 Jun 20, 2025
2012c88
adding flag to just archive fk tables and let cascade take care of de…
vatsrahul1001 Jun 20, 2025
0ff24d7
add test
vatsrahul1001 Jun 23, 2025
07206f4
Merge branch 'main' into enhance-db-cleanup
vatsrahul1001 Jun 23, 2025
182975c
Merge branch 'main' into enhance-db-cleanup
vatsrahul1001 Jun 23, 2025
1f5f481
Adding hardcoded FK details for tables
vatsrahul1001 Jun 23, 2025
a2e9ac1
Merge branch 'enhance-db-cleanup' of github.com:astronomer/airflow in…
vatsrahul1001 Jun 23, 2025
0b37b80
fix table removal by typo
vatsrahul1001 Jun 23, 2025
4d867fa
remove task rechedule from archival
vatsrahul1001 Jun 23, 2025
ad9c1a3
add only direct dependency and let code find all indirects
vatsrahul1001 Jun 23, 2025
aac0e37
Merge branch 'main' into enhance-db-cleanup
vatsrahul1001 Jun 23, 2025
2325921
update logic to find dependent table sin _effective_table_names method
vatsrahul1001 Jun 24, 2025
f30788a
Merge branch 'enhance-db-cleanup' of github.com:astronomer/airflow in…
vatsrahul1001 Jun 24, 2025
5bfdb36
Refactor how we loop over the tables
jedcunningham Jun 24, 2025
827ba5e
adding deadline table as dependent to dag_run
vatsrahul1001 Jun 24, 2025
2249725
Be loud if we have a dependent misconfigured
jedcunningham Jun 24, 2025
e5ae70f
Fix detection of unknown tables in table list
jedcunningham Jun 24, 2025
a490383
adding deadline table as dependent to dag
vatsrahul1001 Jun 24, 2025
df11f3b
Remove skip_delete
jedcunningham Jun 24, 2025
e0a6ea8
Update airflow-core/src/airflow/utils/db_cleanup.py
jedcunningham Jun 24, 2025
685c629
Merge branch 'main' into enhance-db-cleanup
jedcunningham Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 57 additions & 11 deletions airflow-core/src/airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class _TableConfig:
in the table. to ignore certain records even if they are the latest in the table, you can
supply additional filters here (e.g. externally triggered dag runs)
:param keep_last_group_by: if keeping the last record, can keep the last record for each group
:param dependent_tables: list of tables which have FK relationship with this table
"""

table_name: str
Expand All @@ -81,6 +82,10 @@ class _TableConfig:
keep_last: bool = False
keep_last_filters: Any | None = None
keep_last_group_by: Any | None = None
# We explicitly list these tables instead of detecting foreign keys automatically,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

# because the relationships are unlikely to change and the number of tables is small.
# Relying on automation here would increase complexity and reduce maintainability.
dependent_tables: list[str] | None = None

def __post_init__(self):
self.recency_column = column(self.recency_column_name)
Expand All @@ -104,29 +109,46 @@ def readable_config(self):

config_list: list[_TableConfig] = [
_TableConfig(table_name="job", recency_column_name="latest_heartbeat"),
_TableConfig(table_name="dag", recency_column_name="last_parsed_time"),
_TableConfig(
table_name="dag",
recency_column_name="last_parsed_time",
dependent_tables=["dag_version", "deadline"],
),
_TableConfig(
table_name="dag_run",
recency_column_name="start_date",
extra_columns=["dag_id", "run_type"],
keep_last=True,
keep_last_filters=[column("run_type") != DagRunType.MANUAL],
keep_last_group_by=["dag_id"],
dependent_tables=["task_instance", "deadline"],
),
_TableConfig(table_name="asset_event", recency_column_name="timestamp"),
_TableConfig(table_name="import_error", recency_column_name="timestamp"),
_TableConfig(table_name="log", recency_column_name="dttm"),
_TableConfig(table_name="sla_miss", recency_column_name="timestamp"),
_TableConfig(table_name="task_instance", recency_column_name="start_date"),
_TableConfig(
table_name="task_instance",
recency_column_name="start_date",
dependent_tables=["task_instance_history", "xcom"],
),
_TableConfig(table_name="task_instance_history", recency_column_name="start_date"),
_TableConfig(table_name="task_reschedule", recency_column_name="start_date"),
_TableConfig(table_name="xcom", recency_column_name="timestamp"),
_TableConfig(table_name="_xcom_archive", recency_column_name="timestamp"),
_TableConfig(table_name="callback_request", recency_column_name="created_at"),
_TableConfig(table_name="celery_taskmeta", recency_column_name="date_done"),
_TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"),
_TableConfig(table_name="trigger", recency_column_name="created_date"),
_TableConfig(table_name="dag_version", recency_column_name="created_at"),
_TableConfig(
table_name="trigger",
recency_column_name="created_date",
dependent_tables=["task_instance"],
),
_TableConfig(
table_name="dag_version",
recency_column_name="created_at",
dependent_tables=["task_instance", "dag_run"],
),
_TableConfig(table_name="deadline", recency_column_name="deadline_time"),
]

Expand Down Expand Up @@ -234,6 +256,7 @@ def _do_delete(
logger.debug("delete statement:\n%s", delete.compile())
session.execute(delete)
session.commit()

except BaseException as e:
raise e
finally:
Expand Down Expand Up @@ -414,17 +437,37 @@ def _suppress_with_logging(table: str, session: Session):
session.rollback()


def _effective_table_names(*, table_names: list[str] | None) -> tuple[set[str], dict[str, _TableConfig]]:
def _effective_table_names(*, table_names: list[str] | None) -> tuple[list[str], dict[str, _TableConfig]]:
desired_table_names = set(table_names or config_dict)
effective_config_dict = {k: v for k, v in config_dict.items() if k in desired_table_names}
effective_table_names = set(effective_config_dict)
if desired_table_names != effective_table_names:
outliers = desired_table_names - effective_table_names

outliers = desired_table_names - set(config_dict.keys())
if outliers:
logger.warning(
"The following table(s) are not valid choices and will be skipped: %s", sorted(outliers)
"The following table(s) are not valid choices and will be skipped: %s",
sorted(outliers),
)
if not effective_table_names:
desired_table_names = desired_table_names - outliers

visited: set[str] = set()
effective_table_names: list[str] = []

def collect_deps(table: str):
if table in visited:
return
visited.add(table)
config = config_dict[table]
for dep in config.dependent_tables or []:
collect_deps(dep)
effective_table_names.append(table)

for table_name in desired_table_names:
collect_deps(table_name)

effective_config_dict = {n: config_dict[n] for n in effective_table_names}

if not effective_config_dict:
raise SystemExit("No tables selected for db cleanup. Please choose valid table names.")

return effective_table_names, effective_config_dict


Expand Down Expand Up @@ -480,6 +523,8 @@ def run_cleanup(
:param session: Session representing connection to the metadata database.
"""
clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)

# Get all tables to clean (root + dependents)
effective_table_names, effective_config_dict = _effective_table_names(table_names=table_names)
if dry_run:
print("Performing dry run for db cleanup.")
Expand All @@ -491,6 +536,7 @@ def run_cleanup(
if not dry_run and confirm:
_confirm_delete(date=clean_before_timestamp, tables=sorted(effective_table_names))
existing_tables = reflect_tables(tables=None, session=session).tables

for table_name, table_config in effective_config_dict.items():
if table_name in existing_tables:
with _suppress_with_logging(table_name, session):
Expand Down
47 changes: 46 additions & 1 deletion airflow-core/tests/unit/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import pendulum
import pytest
from sqlalchemy import text
from sqlalchemy import inspect, text
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.ext.declarative import DeclarativeMeta

Expand Down Expand Up @@ -303,6 +303,51 @@ def test__cleanup_table(self, table_name, date_add_kwargs, expected_to_delete, r
else:
raise Exception("unexpected")

@pytest.mark.parametrize(
"table_name, expected_archived",
[
(
"dag_run",
{"dag_run", "task_instance"}, # Only these are populated
),
],
)
def test_run_cleanup_archival_integration(self, table_name, expected_archived):
"""
Integration test that verifies:
1. Recursive FK-dependent tables are resolved via _effective_table_names().
2. run_cleanup() archives only tables with data.
3. Archive tables are not created for empty dependent tables.
"""
base_date = pendulum.datetime(2022, 1, 1, tz="UTC")
num_tis = 5

# Create test data for DAG Run and TIs
if table_name in {"dag_run", "task_instance"}:
create_tis(base_date=base_date, num_tis=num_tis, run_type=DagRunType.MANUAL)

clean_before_date = base_date.add(days=10)

with create_session() as session:
run_cleanup(
clean_before_timestamp=clean_before_date,
table_names=[table_name],
dry_run=False,
confirm=False,
session=session,
)

# Inspect archive tables created
inspector = inspect(session.bind)
archive_tables = {
name for name in inspector.get_table_names() if name.startswith(ARCHIVE_TABLE_PREFIX)
}
actual_archived = {t.split("__", 1)[-1].split("__")[0] for t in archive_tables}

assert expected_archived <= actual_archived, (
f"Expected archive tables not found: {expected_archived - actual_archived}"
)

@pytest.mark.parametrize(
"skip_archive, expected_archives",
[pytest.param(True, 1, id="skip_archive"), pytest.param(False, 2, id="do_archive")],
Expand Down