Skip to content

Commit bfd0bbe

Browse files
authored
Prevent repeated warning of triggers being added twice in triggerer (#54438)
When running multiple deferrable in parallel, triggers are repeatedly added to triggerer queue to create by the update_trigger's method. Then it's being skipped with a warning at the creating_trigger's method. The warning is too many when there're many tasks to defer causing inefficency. The fix here was to exclude the already queued triggers from the triggers to create and I purposely left the warning log of encountering a trigger already in to_create in case there's another way it could still be possible.
1 parent 956a950 commit bfd0bbe

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

airflow-core/src/airflow/jobs/triggerer_job_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):
603603
self.running_triggers.union(x[0] for x in self.events)
604604
.union(self.cancelling_triggers)
605605
.union(trigger[0] for trigger in self.failed_triggers)
606+
.union(trigger.id for trigger in self.creating_triggers)
606607
)
607608
# Work out the two difference sets
608609
new_trigger_ids = requested_trigger_ids - known_trigger_ids

airflow-core/tests/unit/jobs/test_triggerer_job.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,3 +1006,80 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d
10061006
assert task_instance.next_kwargs == {
10071007
"event": {"ti_count": 1, "dr_count": 1, "task_states": {"test": {"parent_task": "success"}}}
10081008
}
1009+
1010+
1011+
def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supervisor_builder):
1012+
"""
1013+
Test that update_triggers prevents adding triggers to the creation queue
1014+
if they are already queued for creation.
1015+
"""
1016+
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
1017+
dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger)
1018+
1019+
supervisor = supervisor_builder()
1020+
1021+
# First call to update_triggers should add the trigger to creating_triggers
1022+
supervisor.update_triggers({trigger_orm.id})
1023+
assert len(supervisor.creating_triggers) == 1
1024+
assert supervisor.creating_triggers[0].id == trigger_orm.id
1025+
1026+
# Second call to update_triggers with the same trigger_id should not add it again
1027+
supervisor.update_triggers({trigger_orm.id})
1028+
assert len(supervisor.creating_triggers) == 1
1029+
assert supervisor.creating_triggers[0].id == trigger_orm.id
1030+
1031+
# Verify that the trigger is not in running_triggers yet (it's still queued)
1032+
assert trigger_orm.id not in supervisor.running_triggers
1033+
1034+
# Verify that the trigger is not in any other tracking sets
1035+
assert trigger_orm.id not in supervisor.cancelling_triggers
1036+
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.events)
1037+
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.failed_triggers)
1038+
1039+
1040+
def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple_triggers(
1041+
session, supervisor_builder, dag_maker
1042+
):
1043+
"""
1044+
Test that update_triggers prevents adding multiple triggers to the creation queue
1045+
if they are already queued for creation.
1046+
"""
1047+
trigger1 = TimeDeltaTrigger(datetime.timedelta(days=7))
1048+
trigger2 = TimeDeltaTrigger(datetime.timedelta(days=14))
1049+
1050+
dag_model1, run1, trigger_orm1, task_instance1 = create_trigger_in_db(session, trigger1)
1051+
1052+
with dag_maker("test_dag_2"):
1053+
EmptyOperator(task_id="test_ti_2")
1054+
1055+
run2 = dag_maker.create_dagrun()
1056+
trigger_orm2 = Trigger.from_object(trigger2)
1057+
ti2 = run2.task_instances[0]
1058+
session.add(trigger_orm2)
1059+
session.flush()
1060+
ti2.trigger_id = trigger_orm2.id
1061+
session.merge(ti2)
1062+
session.flush()
1063+
# Create a supervisor
1064+
supervisor = supervisor_builder()
1065+
1066+
# First call to update_triggers should add both triggers to creating_triggers
1067+
supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id})
1068+
assert len(supervisor.creating_triggers) == 2
1069+
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
1070+
assert trigger_orm1.id in trigger_ids
1071+
assert trigger_orm2.id in trigger_ids
1072+
1073+
# Second call to update_triggers with the same trigger_ids should not add them again
1074+
supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id})
1075+
assert len(supervisor.creating_triggers) == 2
1076+
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
1077+
assert trigger_orm1.id in trigger_ids
1078+
assert trigger_orm2.id in trigger_ids
1079+
1080+
# Third call with just one trigger should not add duplicates
1081+
supervisor.update_triggers({trigger_orm1.id})
1082+
assert len(supervisor.creating_triggers) == 2
1083+
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
1084+
assert trigger_orm1.id in trigger_ids
1085+
assert trigger_orm2.id in trigger_ids

0 commit comments

Comments
 (0)