|
38 | 38 | from sqlalchemy.sql import expression
|
39 | 39 |
|
40 | 40 | from airflow import settings
|
41 |
| -from airflow.api_fastapi.execution_api.datamodels.taskinstance import TIRunContext |
| 41 | +from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DRDataModel, TIRunContext |
42 | 42 | from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext, TaskCallbackRequest
|
43 | 43 | from airflow.configuration import conf
|
44 | 44 | from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
|
@@ -827,6 +827,7 @@ def process_executor_events(
|
827 | 827 | select(TI)
|
828 | 828 | .where(filter_for_tis)
|
829 | 829 | .options(selectinload(TI.dag_model))
|
| 830 | + .options(selectinload(TI.dag_run).selectinload(DagRun.consumed_asset_events)) |
830 | 831 | .options(joinedload(TI.dag_version))
|
831 | 832 | )
|
832 | 833 | # row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have
|
@@ -947,7 +948,7 @@ def process_executor_events(
|
947 | 948 | ti=ti,
|
948 | 949 | msg=msg,
|
949 | 950 | context_from_server=TIRunContext(
|
950 |
| - dag_run=ti.dag_run, |
| 951 | + dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True), |
951 | 952 | max_tries=ti.max_tries,
|
952 | 953 | variables=[],
|
953 | 954 | connections=[],
|
@@ -2308,7 +2309,7 @@ def _purge_task_instances_without_heartbeats(
|
2308 | 2309 | ti=ti,
|
2309 | 2310 | msg=str(task_instance_heartbeat_timeout_message_details),
|
2310 | 2311 | context_from_server=TIRunContext(
|
2311 |
| - dag_run=ti.dag_run, |
| 2312 | + dag_run=DRDataModel.model_validate(ti.dag_run, from_attributes=True), |
2312 | 2313 | max_tries=ti.max_tries,
|
2313 | 2314 | variables=[],
|
2314 | 2315 | connections=[],
|
|
0 commit comments