@@ -180,12 +180,6 @@ def teardown_method(self) -> None:
180
180
clear_db_dags ()
181
181
clear_db_assets ()
182
182
183
- @staticmethod
184
- def _clean_up (dag_id : str ):
185
- with create_session () as session :
186
- session .query (DagRun ).filter (DagRun .dag_id == dag_id ).delete (synchronize_session = False )
187
- session .query (TI ).filter (TI .dag_id == dag_id ).delete (synchronize_session = False )
188
-
189
183
@staticmethod
190
184
def _occur_before (a , b , list_ ):
191
185
"""
@@ -977,8 +971,6 @@ def add_failed_dag_run(dag, id, logical_date):
977
971
)
978
972
add_failed_dag_run (dag , "2" , TEST_DATE + timedelta (days = 1 ))
979
973
assert dag .get_is_paused ()
980
- dag .clear ()
981
- self ._clean_up (dag_id )
982
974
983
975
def test_dag_is_deactivated_upon_dagfile_deletion (self , dag_maker ):
984
976
dag_id = "old_existing_dag"
@@ -1038,8 +1030,6 @@ def test_schedule_dag_no_previous_runs(self):
1038
1030
)
1039
1031
assert dag_run .state == State .RUNNING
1040
1032
assert dag_run .run_type != DagRunType .MANUAL
1041
- dag .clear ()
1042
- self ._clean_up (dag_id )
1043
1033
1044
1034
@patch ("airflow.models.dag.Stats" )
1045
1035
def test_dag_handle_callback_crash (self , mock_stats ):
@@ -1080,9 +1070,6 @@ def test_dag_handle_callback_crash(self, mock_stats):
1080
1070
tags = {"dag_id" : "test_dag_callback_crash" },
1081
1071
)
1082
1072
1083
- dag .clear ()
1084
- self ._clean_up (dag_id )
1085
-
1086
1073
def test_dag_handle_callback_with_removed_task (self , dag_maker , session ):
1087
1074
"""
1088
1075
Tests avoid crashes when a removed task is the last one in the list of task instance
@@ -1118,9 +1105,6 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session):
1118
1105
dag .handle_callback (dag_run , success = True )
1119
1106
dag .handle_callback (dag_run , success = False )
1120
1107
1121
- dag .clear ()
1122
- self ._clean_up (dag_id )
1123
-
1124
1108
@pytest .mark .parametrize ("catchup,expected_next_dagrun" , [(True , DEFAULT_DATE ), (False , None )])
1125
1109
def test_next_dagrun_after_fake_scheduled_previous (self , catchup , expected_next_dagrun ):
1126
1110
"""
@@ -1158,8 +1142,6 @@ def test_next_dagrun_after_fake_scheduled_previous(self, catchup, expected_next_
1158
1142
assert model .next_dagrun == expected_next_dagrun
1159
1143
assert model .next_dagrun_create_after == expected_next_dagrun + delta
1160
1144
1161
- self ._clean_up (dag_id )
1162
-
1163
1145
def test_schedule_dag_once (self ):
1164
1146
"""
1165
1147
Tests scheduling a dag scheduled for @once - should be scheduled the first time
@@ -1188,7 +1170,6 @@ def test_schedule_dag_once(self):
1188
1170
1189
1171
assert model .next_dagrun is None
1190
1172
assert model .next_dagrun_create_after is None
1191
- self ._clean_up (dag_id )
1192
1173
1193
1174
def test_fractional_seconds (self ):
1194
1175
"""
@@ -1213,7 +1194,6 @@ def test_fractional_seconds(self):
1213
1194
1214
1195
assert start_date == run .logical_date , "dag run logical_date loses precision"
1215
1196
assert start_date == run .start_date , "dag run start_date loses precision "
1216
- self ._clean_up (dag_id )
1217
1197
1218
1198
def test_rich_comparison_ops (self ):
1219
1199
test_dag_id = "test_rich_comparison_ops"
@@ -1397,46 +1377,39 @@ def test_dag_add_task_sets_default_task_group(self):
1397
1377
assert dag .get_task ("task_group.task_with_task_group" ) == task_with_task_group
1398
1378
1399
1379
@pytest .mark .parametrize ("dag_run_state" , [DagRunState .QUEUED , DagRunState .RUNNING ])
1400
- def test_clear_set_dagrun_state (self , dag_run_state ):
1380
+ @pytest .mark .need_serialized_dag
1381
+ def test_clear_set_dagrun_state (self , dag_run_state , dag_maker , session ):
1401
1382
dag_id = "test_clear_set_dagrun_state"
1402
- self ._clean_up (dag_id )
1403
- task_id = "t1"
1404
- dag = DAG (dag_id , schedule = None , start_date = DEFAULT_DATE , max_active_runs = 1 )
1405
- t_1 = EmptyOperator (task_id = task_id , dag = dag )
1406
1383
1407
- session = settings .Session ()
1408
- dagrun_1 = _create_dagrun (
1409
- dag ,
1384
+ with dag_maker (dag_id , start_date = DEFAULT_DATE , max_active_runs = 1 ) as dag :
1385
+ task_id = "t1"
1386
+ EmptyOperator (task_id = task_id )
1387
+
1388
+ dr = dag_maker .create_dagrun (
1410
1389
run_type = DagRunType .BACKFILL_JOB ,
1411
1390
state = State .FAILED ,
1412
1391
start_date = DEFAULT_DATE ,
1413
1392
logical_date = DEFAULT_DATE ,
1414
- data_interval = ( DEFAULT_DATE , DEFAULT_DATE ) ,
1393
+ session = session ,
1415
1394
)
1416
- session .merge (dagrun_1 )
1417
-
1418
- task_instance_1 = TI (t_1 , run_id = dagrun_1 .run_id , state = State .RUNNING )
1419
- task_instance_1 .refresh_from_db ()
1420
- session .merge (task_instance_1 )
1421
1395
session .commit ()
1396
+ session .refresh (dr )
1397
+ assert dr .state == "failed"
1422
1398
1423
1399
dag .clear (
1424
1400
start_date = DEFAULT_DATE ,
1425
1401
end_date = DEFAULT_DATE + datetime .timedelta (days = 1 ),
1426
1402
dag_run_state = dag_run_state ,
1427
1403
session = session ,
1428
1404
)
1429
- dagruns = session .query (DagRun ).filter (DagRun .dag_id == dag_id ).all ()
1430
-
1431
- assert len (dagruns ) == 1
1432
- dagrun : DagRun = dagruns [0 ]
1433
- assert dagrun .state == dag_run_state
1405
+ session .refresh (dr )
1406
+ assert dr .state == dag_run_state
1434
1407
1435
1408
@pytest .mark .parametrize ("dag_run_state" , [DagRunState .QUEUED , DagRunState .RUNNING ])
1436
1409
@pytest .mark .need_serialized_dag
1437
1410
def test_clear_set_dagrun_state_for_mapped_task (self , dag_maker , dag_run_state ):
1438
1411
dag_id = "test_clear_set_dagrun_state"
1439
- self . _clean_up ( dag_id )
1412
+
1440
1413
task_id = "t1"
1441
1414
1442
1415
with dag_maker (dag_id , schedule = None , start_date = DEFAULT_DATE , max_active_runs = 1 ) as dag :
@@ -1611,32 +1584,37 @@ def test_clear_dag(
1611
1584
self ,
1612
1585
ti_state_begin : TaskInstanceState | None ,
1613
1586
ti_state_end : TaskInstanceState | None ,
1587
+ dag_maker ,
1588
+ session ,
1614
1589
):
1615
1590
dag_id = "test_clear_dag"
1616
- self . _clean_up ( dag_id )
1591
+
1617
1592
task_id = "t1"
1618
- dag = DAG (dag_id , schedule = None , start_date = DEFAULT_DATE , max_active_runs = 1 )
1619
- _ = EmptyOperator (task_id = task_id , dag = dag )
1593
+ with dag_maker (
1594
+ dag_id ,
1595
+ schedule = None ,
1596
+ start_date = DEFAULT_DATE ,
1597
+ max_active_runs = 1 ,
1598
+ serialized = True ,
1599
+ ) as dag :
1600
+ EmptyOperator (task_id = task_id )
1620
1601
1621
1602
session = settings .Session () # type: ignore
1622
- dagrun_1 = dag .create_dagrun (
1603
+ dagrun_1 = dag_maker .create_dagrun (
1623
1604
run_id = "backfill" ,
1624
1605
run_type = DagRunType .BACKFILL_JOB ,
1625
1606
state = DagRunState .RUNNING ,
1626
1607
start_date = DEFAULT_DATE ,
1627
1608
logical_date = DEFAULT_DATE ,
1628
- data_interval = (DEFAULT_DATE , DEFAULT_DATE ),
1629
- run_after = DEFAULT_DATE ,
1630
- triggered_by = DagRunTriggeredByType .TEST ,
1609
+ # triggered_by=DagRunTriggeredByType.TEST,
1610
+ session = session ,
1631
1611
)
1632
- session .merge (dagrun_1 )
1633
1612
1634
- task_instance_1 = dagrun_1 .get_task_instance (task_id )
1613
+ task_instance_1 = dagrun_1 .get_task_instance (task_id , session = session )
1635
1614
if TYPE_CHECKING :
1636
1615
assert task_instance_1
1637
1616
task_instance_1 .state = ti_state_begin
1638
1617
task_instance_1 .job_id = 123
1639
- session .merge (task_instance_1 )
1640
1618
session .commit ()
1641
1619
1642
1620
dag .clear (
@@ -1650,7 +1628,6 @@ def test_clear_dag(
1650
1628
assert len (task_instances ) == 1
1651
1629
task_instance : TI = task_instances [0 ]
1652
1630
assert task_instance .state == ti_state_end
1653
- self ._clean_up (dag_id )
1654
1631
1655
1632
def test_next_dagrun_info_once (self ):
1656
1633
dag = DAG ("test_scheduler_dagrun_once" , start_date = timezone .datetime (2015 , 1 , 1 ), schedule = "@once" )
@@ -2508,7 +2485,12 @@ def test_count_number_queries(self, tasks_count):
2508
2485
def test_set_task_instance_state (run_id , session , dag_maker ):
2509
2486
"""Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""
2510
2487
start_date = datetime_tz (2020 , 1 , 1 )
2511
- with dag_maker ("test_set_task_instance_state" , start_date = start_date , session = session ) as dag :
2488
+ with dag_maker (
2489
+ "test_set_task_instance_state" ,
2490
+ start_date = start_date ,
2491
+ session = session ,
2492
+ serialized = True ,
2493
+ ) as dag :
2512
2494
task_1 = EmptyOperator (task_id = "task_1" )
2513
2495
task_2 = EmptyOperator (task_id = "task_2" )
2514
2496
task_3 = EmptyOperator (task_id = "task_3" )
@@ -2646,7 +2628,12 @@ def consumer(value):
2646
2628
def test_set_task_group_state (session , dag_maker ):
2647
2629
"""Test that set_task_group_state updates the TaskGroup state and clear downstream failed"""
2648
2630
start_date = datetime_tz (2020 , 1 , 1 )
2649
- with dag_maker ("test_set_task_group_state" , start_date = start_date , session = session ) as dag :
2631
+ with dag_maker (
2632
+ "test_set_task_group_state" ,
2633
+ start_date = start_date ,
2634
+ session = session ,
2635
+ serialized = True ,
2636
+ ) as dag :
2650
2637
start = EmptyOperator (task_id = "start" )
2651
2638
2652
2639
with TaskGroup ("section_1" , tooltip = "Tasks for section_1" ) as section_1 :
0 commit comments