Skip to content

Commit 370e0b7

Browse files
authored
Prevent DetachedInstanceError when processing executor event (#54334)
1 parent 4a4c0b9 commit 370e0b7

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
from airflow import settings
4141
from airflow._shared.timezones import timezone
42-
from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext
42+
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DRDataModel, TIRunContext
4343
from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext, TaskCallbackRequest
4444
from airflow.configuration import conf
4545
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
@@ -768,6 +768,7 @@ def process_executor_events(
768768
select(TI)
769769
.where(filter_for_tis)
770770
.options(selectinload(TI.dag_model))
771+
.options(selectinload(TI.dag_run).selectinload(DagRun.consumed_asset_events))
771772
.options(joinedload(TI.dag_version))
772773
)
773774
# row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have
@@ -888,7 +889,7 @@ def process_executor_events(
888889
ti=ti,
889890
msg=msg,
890891
context_from_server=TIRunContext(
891-
dag_run=ti.dag_run,
892+
dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True),
892893
max_tries=ti.max_tries,
893894
variables=[],
894895
connections=[],
@@ -2266,7 +2267,7 @@ def _purge_task_instances_without_heartbeats(
22662267
ti=ti,
22672268
msg=str(task_instance_heartbeat_timeout_message_details),
22682269
context_from_server=TIRunContext(
2269-
dag_run=ti.dag_run,
2270+
dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True),
22702271
max_tries=ti.max_tries,
22712272
variables=[],
22722273
connections=[],

0 commit comments

Comments
 (0)