Skip to content

Commit ed7e05f

Browse files
[v3-0-test] Prevent repeated warning of triggers being added twice in triggerer (#54438) (#54441)
When running multiple deferrable in parallel, triggers are repeatedly added to triggerer queue to create by the update_trigger's method. Then it's being skipped with a warning at the creating_trigger's method. The warning is too many when there're many tasks to defer causing inefficency. The fix here was to exclude the already queued triggers from the triggers to create and I purposely left the warning log of encountering a trigger already in to_create in case there's another way it could still be possible. (cherry picked from commit bfd0bbe) Co-authored-by: Ephraim Anierobi <[email protected]>
1 parent 5c07811 commit ed7e05f

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

airflow-core/src/airflow/jobs/triggerer_job_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):
574574
self.running_triggers.union(x[0] for x in self.events)
575575
.union(self.cancelling_triggers)
576576
.union(trigger[0] for trigger in self.failed_triggers)
577+
.union(trigger.id for trigger in self.creating_triggers)
577578
)
578579
# Work out the two difference sets
579580
new_trigger_ids = requested_trigger_ids - known_trigger_ids

airflow-core/tests/unit/jobs/test_triggerer_job.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,3 +988,80 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d
988988
assert task_instance.next_kwargs == {
989989
"event": {"ti_count": 1, "dr_count": 1, "task_states": {"test": {"parent_task": "success"}}}
990990
}
991+
992+
993+
def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supervisor_builder):
994+
"""
995+
Test that update_triggers prevents adding triggers to the creation queue
996+
if they are already queued for creation.
997+
"""
998+
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
999+
dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger)
1000+
1001+
supervisor = supervisor_builder()
1002+
1003+
# First call to update_triggers should add the trigger to creating_triggers
1004+
supervisor.update_triggers({trigger_orm.id})
1005+
assert len(supervisor.creating_triggers) == 1
1006+
assert supervisor.creating_triggers[0].id == trigger_orm.id
1007+
1008+
# Second call to update_triggers with the same trigger_id should not add it again
1009+
supervisor.update_triggers({trigger_orm.id})
1010+
assert len(supervisor.creating_triggers) == 1
1011+
assert supervisor.creating_triggers[0].id == trigger_orm.id
1012+
1013+
# Verify that the trigger is not in running_triggers yet (it's still queued)
1014+
assert trigger_orm.id not in supervisor.running_triggers
1015+
1016+
# Verify that the trigger is not in any other tracking sets
1017+
assert trigger_orm.id not in supervisor.cancelling_triggers
1018+
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.events)
1019+
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.failed_triggers)
1020+
1021+
1022+
def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple_triggers(
1023+
session, supervisor_builder, dag_maker
1024+
):
1025+
"""
1026+
Test that update_triggers prevents adding multiple triggers to the creation queue
1027+
if they are already queued for creation.
1028+
"""
1029+
trigger1 = TimeDeltaTrigger(datetime.timedelta(days=7))
1030+
trigger2 = TimeDeltaTrigger(datetime.timedelta(days=14))
1031+
1032+
dag_model1, run1, trigger_orm1, task_instance1 = create_trigger_in_db(session, trigger1)
1033+
1034+
with dag_maker("test_dag_2"):
1035+
EmptyOperator(task_id="test_ti_2")
1036+
1037+
run2 = dag_maker.create_dagrun()
1038+
trigger_orm2 = Trigger.from_object(trigger2)
1039+
ti2 = run2.task_instances[0]
1040+
session.add(trigger_orm2)
1041+
session.flush()
1042+
ti2.trigger_id = trigger_orm2.id
1043+
session.merge(ti2)
1044+
session.flush()
1045+
# Create a supervisor
1046+
supervisor = supervisor_builder()
1047+
1048+
# First call to update_triggers should add both triggers to creating_triggers
1049+
supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id})
1050+
assert len(supervisor.creating_triggers) == 2
1051+
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
1052+
assert trigger_orm1.id in trigger_ids
1053+
assert trigger_orm2.id in trigger_ids
1054+
1055+
# Second call to update_triggers with the same trigger_ids should not add them again
1056+
supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id})
1057+
assert len(supervisor.creating_triggers) == 2
1058+
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
1059+
assert trigger_orm1.id in trigger_ids
1060+
assert trigger_orm2.id in trigger_ids
1061+
1062+
# Third call with just one trigger should not add duplicates
1063+
supervisor.update_triggers({trigger_orm1.id})
1064+
assert len(supervisor.creating_triggers) == 2
1065+
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
1066+
assert trigger_orm1.id in trigger_ids
1067+
assert trigger_orm2.id in trigger_ids

0 commit comments

Comments
 (0)