Skip to content

Commit 58d0c88

Browse files
dstandishuranusjr
authored andcommitted
Use latest bundle version when clearing / re-running dag (#50040)
Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit c069401)
1 parent d2d521f commit 58d0c88

File tree

14 files changed

+338
-307
lines changed

14 files changed

+338
-307
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,6 @@ def post_clear_task_instances(
708708
clear_task_instances(
709709
task_instances,
710710
session,
711-
dag,
712711
DagRunState.QUEUED if reset_dag_runs else False,
713712
)
714713

airflow-core/src/airflow/models/baseoperator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def clear(
381381
# definition code
382382
assert isinstance(self.dag, SchedulerDAG)
383383

384-
clear_task_instances(results, session, dag=self.dag)
384+
clear_task_instances(results, session)
385385
session.commit()
386386
return count
387387

airflow-core/src/airflow/models/dag.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@
9696
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, BaseAsset
9797
from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as task_sdk_dag_decorator
9898
from airflow.secrets.local_filesystem import LocalFilesystemBackend
99-
from airflow.security import permissions
10099
from airflow.settings import json
101100
from airflow.stats import Stats
102101
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
@@ -468,6 +467,7 @@ def _upgrade_outdated_dag_access_control(access_control=None):
468467
return None
469468

470469
from airflow.providers.fab import __version__ as FAB_VERSION
470+
from airflow.providers.fab.www.security import permissions
471471

472472
updated_access_control = {}
473473
for role, perms in access_control.items():
@@ -1526,7 +1526,6 @@ def clear(
15261526
clear_task_instances(
15271527
list(tis),
15281528
session,
1529-
dag=self,
15301529
dag_run_state=dag_run_state,
15311530
)
15321531
else:

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@
9494
from airflow.listeners.listener import get_listener_manager
9595
from airflow.models.asset import AssetActive, AssetEvent, AssetModel
9696
from airflow.models.base import Base, StringID, TaskInstanceDependencies
97-
from airflow.models.dagbag import DagBag
9897
from airflow.models.log import Log
9998
from airflow.models.renderedtifields import get_serialized_template_fields
10099
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -255,7 +254,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, task_teardown_map=None
255254
def clear_task_instances(
256255
tis: list[TaskInstance],
257256
session: Session,
258-
dag: DAG | None = None,
259257
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
260258
) -> None:
261259
"""
@@ -271,11 +269,13 @@ def clear_task_instances(
271269
:param session: current session
272270
:param dag_run_state: state to set finished DagRuns to.
273271
If set to False, DagRuns state will not be changed.
274-
:param dag: DAG object
272+
273+
:meta private:
275274
"""
276-
# taskinstance uuids:
277275
task_instance_ids: list[str] = []
278-
dag_bag = DagBag(read_dags_from_db=True)
276+
from airflow.jobs.scheduler_job_runner import SchedulerDagBag
277+
278+
scheduler_dagbag = SchedulerDagBag()
279279

280280
for ti in tis:
281281
task_instance_ids.append(ti.id)
@@ -285,7 +285,10 @@ def clear_task_instances(
285285
# the task is terminated and becomes eligible for retry.
286286
ti.state = TaskInstanceState.RESTARTING
287287
else:
288-
ti_dag = dag if dag and dag.dag_id == ti.dag_id else dag_bag.get_dag(ti.dag_id, session=session)
288+
dr = ti.dag_run
289+
ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session)
290+
if not ti_dag:
291+
log.warning("No serialized dag found for dag '%s'", dr.dag_id)
289292
task_id = ti.task_id
290293
if ti_dag and ti_dag.has_task(task_id):
291294
task = ti_dag.get_task(task_id)
@@ -326,6 +329,13 @@ def clear_task_instances(
326329
if dr.state in State.finished_dr_states:
327330
dr.state = dag_run_state
328331
dr.start_date = timezone.utcnow()
332+
dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session)
333+
if not dr_dag:
334+
log.warning("No serialized dag found for dag '%s'", dr.dag_id)
335+
if dr_dag and not dr_dag.disable_bundle_versioning:
336+
bundle_version = dr.dag_model.bundle_version
337+
if bundle_version is not None:
338+
dr.bundle_version = bundle_version
329339
if dag_run_state == DagRunState.QUEUED:
330340
dr.last_scheduling_decision = None
331341
dr.start_date = None

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
from tests_common.test_utils.db import clear_db_runs
3939

40-
pytestmark = pytest.mark.db_test
40+
pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]
4141

4242

4343
class TestTaskInstancesLog:

airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2258,8 +2258,7 @@ def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, t
22582258

22592259
# dag (3rd argument) is a different session object. Manually asserting that the dag_id
22602260
# is the same.
2261-
mock_clearti.assert_called_once_with([], mock.ANY, mock.ANY, DagRunState.QUEUED)
2262-
assert mock_clearti.call_args[0][2].dag_id == dag_id
2261+
mock_clearti.assert_called_once_with([], mock.ANY, DagRunState.QUEUED)
22632262

22642263
def test_clear_taskinstance_is_called_with_invalid_task_ids(self, test_client, session):
22652264
"""Test that dagrun is running when invalid task_ids are passed to clearTaskInstances API."""

airflow-core/tests/unit/models/test_backfill.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import pytest
2626
from sqlalchemy import select
2727

28-
from airflow.models import DagRun, TaskInstance
28+
from airflow.models import DagModel, DagRun, TaskInstance
2929
from airflow.models.backfill import (
3030
AlreadyRunningBackfill,
3131
Backfill,
@@ -152,6 +152,61 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
152152
assert all(x.conf == expected_run_conf for x in dag_runs)
153153

154154

155+
def test_create_backfill_clear_existing_bundle_version(dag_maker, session):
156+
"""
157+
Verify that when backfill clears an existing dag run, bundle version is cleared.
158+
"""
159+
# two that will be reprocessed, and an old one not to be processed by backfill
160+
existing = ["1985-01-01", "2021-01-02", "2021-01-03"]
161+
run_ids = {d: f"scheduled_{d}" for d in existing}
162+
with dag_maker(schedule="@daily") as dag:
163+
PythonOperator(task_id="hi", python_callable=print)
164+
165+
dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag.dag_id))
166+
first_bundle_version = "bundle_VclmpcTdXv"
167+
dag_model.bundle_version = first_bundle_version
168+
session.commit()
169+
for date in existing:
170+
dag_maker.create_dagrun(
171+
run_id=run_ids[date], logical_date=timezone.parse(date), session=session, state="failed"
172+
)
173+
session.commit()
174+
175+
# update bundle version
176+
new_bundle_version = "bundle_VclmpcTdXv-2"
177+
dag_model.bundle_version = new_bundle_version
178+
session.commit()
179+
180+
# verify that existing dag runs still have the first bundle version
181+
dag_runs = list(session.scalars(select(DagRun).where(DagRun.dag_id == dag.dag_id)))
182+
assert [x.bundle_version for x in dag_runs] == 3 * [first_bundle_version]
183+
assert [x.state for x in dag_runs] == 3 * ["failed"]
184+
session.commit()
185+
_create_backfill(
186+
dag_id=dag.dag_id,
187+
from_date=pendulum.parse("2021-01-01"),
188+
to_date=pendulum.parse("2021-01-05"),
189+
max_active_runs=10,
190+
reverse=False,
191+
dag_run_conf=None,
192+
reprocess_behavior=ReprocessBehavior.FAILED,
193+
)
194+
session.commit()
195+
196+
# verify that the old dag run (not included in backfill) still has first bundle version
197+
# but the latter 5, which are included in the backfill, have the latest bundle version
198+
dag_runs = sorted(
199+
session.scalars(
200+
select(DagRun).where(
201+
DagRun.dag_id == dag.dag_id,
202+
),
203+
),
204+
key=lambda x: x.logical_date,
205+
)
206+
expected = [first_bundle_version] + 5 * [new_bundle_version]
207+
assert [x.bundle_version for x in dag_runs] == expected
208+
209+
155210
@pytest.mark.parametrize(
156211
"reprocess_behavior, num_in_b, exc_reasons",
157212
[

0 commit comments

Comments
 (0)