Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ def update_triggers(self, requested_trigger_ids: set[int]):
self.running_triggers.union(x[0] for x in self.events)
.union(self.cancelling_triggers)
.union(trigger[0] for trigger in self.failed_triggers)
.union(trigger.id for trigger in self.creating_triggers)
)
# Work out the two difference sets
new_trigger_ids = requested_trigger_ids - known_trigger_ids
Expand Down
77 changes: 77 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,3 +988,80 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d
assert task_instance.next_kwargs == {
"event": {"ti_count": 1, "dr_count": 1, "task_states": {"test": {"parent_task": "success"}}}
}


def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supervisor_builder):
"""
Test that update_triggers prevents adding triggers to the creation queue
if they are already queued for creation.
"""
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger)

supervisor = supervisor_builder()

# First call to update_triggers should add the trigger to creating_triggers
supervisor.update_triggers({trigger_orm.id})
assert len(supervisor.creating_triggers) == 1
assert supervisor.creating_triggers[0].id == trigger_orm.id

# Second call to update_triggers with the same trigger_id should not add it again
supervisor.update_triggers({trigger_orm.id})
assert len(supervisor.creating_triggers) == 1
assert supervisor.creating_triggers[0].id == trigger_orm.id

# Verify that the trigger is not in running_triggers yet (it's still queued)
assert trigger_orm.id not in supervisor.running_triggers

# Verify that the trigger is not in any other tracking sets
assert trigger_orm.id not in supervisor.cancelling_triggers
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.events)
assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.failed_triggers)


def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple_triggers(
session, supervisor_builder, dag_maker
):
"""
Test that update_triggers prevents adding multiple triggers to the creation queue
if they are already queued for creation.
"""
trigger1 = TimeDeltaTrigger(datetime.timedelta(days=7))
trigger2 = TimeDeltaTrigger(datetime.timedelta(days=14))

dag_model1, run1, trigger_orm1, task_instance1 = create_trigger_in_db(session, trigger1)

with dag_maker("test_dag_2"):
EmptyOperator(task_id="test_ti_2")

run2 = dag_maker.create_dagrun()
trigger_orm2 = Trigger.from_object(trigger2)
ti2 = run2.task_instances[0]
session.add(trigger_orm2)
session.flush()
ti2.trigger_id = trigger_orm2.id
session.merge(ti2)
session.flush()
# Create a supervisor
supervisor = supervisor_builder()

# First call to update_triggers should add both triggers to creating_triggers
supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id})
assert len(supervisor.creating_triggers) == 2
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
assert trigger_orm1.id in trigger_ids
assert trigger_orm2.id in trigger_ids

# Second call to update_triggers with the same trigger_ids should not add them again
supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id})
assert len(supervisor.creating_triggers) == 2
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
assert trigger_orm1.id in trigger_ids
assert trigger_orm2.id in trigger_ids

# Third call with just one trigger should not add duplicates
supervisor.update_triggers({trigger_orm1.id})
assert len(supervisor.creating_triggers) == 2
trigger_ids = {trigger.id for trigger in supervisor.creating_triggers}
assert trigger_orm1.id in trigger_ids
assert trigger_orm2.id in trigger_ids
Loading