Skip to content

Commit ea45411

Browse files
committed
fix tests; use right serdag
1 parent 2603597 commit ea45411

File tree

4 files changed

+162
-201
lines changed

4 files changed

+162
-201
lines changed

airflow-core/src/airflow/models/baseoperator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def clear(
381381
# definition code
382382
assert isinstance(self.dag, SchedulerDAG)
383383

384-
clear_task_instances(results, session, dag=self.dag)
384+
clear_task_instances(results, session)
385385
session.commit()
386386
return count
387387

airflow-core/src/airflow/models/dag.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1526,7 +1526,6 @@ def clear(
15261526
clear_task_instances(
15271527
list(tis),
15281528
session,
1529-
dag=self,
15301529
dag_run_state=dag_run_state,
15311530
)
15321531
else:

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@
9494
from airflow.listeners.listener import get_listener_manager
9595
from airflow.models.asset import AssetActive, AssetEvent, AssetModel
9696
from airflow.models.base import Base, StringID, TaskInstanceDependencies
97-
from airflow.models.dagbag import DagBag
9897
from airflow.models.log import Log
9998
from airflow.models.renderedtifields import get_serialized_template_fields
10099
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -255,7 +254,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, task_teardown_map=None
255254
def clear_task_instances(
256255
tis: list[TaskInstance],
257256
session: Session,
258-
dag: DAG | None = None,
259257
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
260258
) -> None:
261259
"""
@@ -271,11 +269,13 @@ def clear_task_instances(
271269
:param session: current session
272270
:param dag_run_state: state to set finished DagRuns to.
273271
If set to False, DagRuns state will not be changed.
274-
:param dag: DAG object
272+
273+
:meta private:
275274
"""
276-
# taskinstance uuids:
277275
task_instance_ids: list[str] = []
278-
dag_bag = DagBag(read_dags_from_db=True)
276+
from airflow.jobs.scheduler_job_runner import SchedulerDagBag
277+
278+
scheduler_dagbag = SchedulerDagBag()
279279

280280
for ti in tis:
281281
task_instance_ids.append(ti.id)
@@ -285,7 +285,12 @@ def clear_task_instances(
285285
# the task is terminated and becomes eligible for retry.
286286
ti.state = TaskInstanceState.RESTARTING
287287
else:
288-
ti_dag = dag if dag and dag.dag_id == ti.dag_id else dag_bag.get_dag(ti.dag_id, session=session)
288+
dr = ti.dag_run
289+
ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session)
290+
if not ti_dag:
291+
raise AirflowException(
292+
f"Serialized dag not found for dag run. dag_id={dr.dag_id} run_id={dr.run_id}"
293+
)
289294
task_id = ti.task_id
290295
if ti_dag and ti_dag.has_task(task_id):
291296
task = ti_dag.get_task(task_id)
@@ -327,15 +332,18 @@ def clear_task_instances(
327332
if dr.state in State.finished_dr_states:
328333
dr.state = dag_run_state
329334
dr.start_date = timezone.utcnow()
330-
if TYPE_CHECKING:
331-
assert dag # todo: change signature so this is required
332-
if not dag.disable_bundle_versioning:
335+
dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session)
336+
if not dr_dag:
337+
raise AirflowException(
338+
f"Serialized dag not found for dag run. dag_id={dr.dag_id} run_id={dr.run_id}"
339+
)
340+
if not dr_dag.disable_bundle_versioning:
333341
if dr.dag_model:
334342
bundle_version = dr.dag_model.bundle_version
335343
else:
336344
bundle_version = session.scalar(
337345
select(DagModel.bundle_version).where(
338-
DagModel.dag_id == dag.dag_id,
346+
DagModel.dag_id == dr_dag.dag_id,
339347
)
340348
)
341349
if bundle_version is not None:

0 commit comments

Comments
 (0)