Skip to content

Commit e821915

Browse files
committed
Remove unmap method from scheduler-side
Remove unnecessary unmap() functionality from server-side operators to simplify scheduler architecture and eliminate synthetic operator creation. Key changes: - Remove unmap() method from MappedOperator class - Update TaskInstance.fetch_handle_failure_context() to use original task directly - Remove unmap() call from SerializedBaseOperator.get_extra_links() - Update related tests to verify serialization without unmap functionality The scheduler no longer needs to 'unmap' operators since: - Callbacks are handled by DAG processor, not scheduler - Email settings and fail-fast logic work with original task - Extra links work consistently between regular and mapped operators This eliminates the TODO comment about moving runtime unmap to task runner and provides cleaner separation between scheduler and execution concerns. Includes pre-commit formatting fixes applied by ruff and ruff-format.
1 parent a63fc7b commit e821915

File tree

5 files changed

+4
-101
lines changed

5 files changed

+4
-101
lines changed

airflow-core/src/airflow/models/mappedoperator.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -440,30 +440,6 @@ def expand_start_from_trigger(self, *, context: Context) -> bool:
440440
def expand_start_trigger_args(self, *, context: Context) -> StartTriggerArgs | None:
441441
raise NotImplementedError
442442

443-
def unmap(self, resolve: None) -> SerializedBaseOperator:
444-
"""
445-
Get the "normal" Operator after applying the current mapping.
446-
447-
The *resolve* argument is never used and should always be *None*. It
448-
exists only to match the signature of the non-serialized implementation.
449-
450-
The return value is a SerializedBaseOperator that "looks like" the
451-
actual unmapping result.
452-
453-
:meta private:
454-
"""
455-
# After a mapped operator is serialized, there's no real way to actually
456-
# unmap it since we've lost access to the underlying operator class.
457-
# This tries its best to simply "forward" all the attributes on this
458-
# mapped operator to a new SerializedBaseOperator instance.
459-
sop = SerializedBaseOperator(task_id=self.task_id, params=self.params, _airflow_from_mapped=True)
460-
for partial_attr, value in self.partial_kwargs.items():
461-
setattr(sop, partial_attr, value)
462-
SerializedBaseOperator.populate_operator(sop, self.operator_class)
463-
if self.dag is not None: # For Mypy; we only serialize tasks in a DAG so the check always satisfies.
464-
SerializedBaseOperator.set_task_dag_references(sop, self.dag)
465-
return sop
466-
467443

468444
@functools.singledispatch
469445
def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> int:

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,18 +1610,13 @@ def fetch_handle_failure_context(
16101610
# only mark task instance as FAILED if the next task instance
16111611
# try_number exceeds the max_tries ... or if force_fail is truthy
16121612

1613-
task: SerializedBaseOperator | None = None
1614-
try:
1615-
if (orig_task := getattr(ti, "task", None)) and context:
1616-
# TODO (GH-52141): Move runtime unmap into task runner.
1617-
task = orig_task.unmap((context, session))
1618-
except Exception:
1619-
cls.logger().error("Unable to unmap task to determine if we need to send an alert email")
1613+
# Use the original task directly - scheduler only needs to check email settings
1614+
# Actual callbacks are handled by the DAG processor, not the scheduler
1615+
task = getattr(ti, "task", None)
16201616

16211617
if not ti.is_eligible_to_retry():
16221618
ti.state = TaskInstanceState.FAILED
16231619
email_for_state = operator.attrgetter("email_on_failure")
1624-
callbacks = task.on_failure_callback if task else None
16251620

16261621
if task and fail_fast:
16271622
_stop_remaining_tasks(task_instance=ti, session=session)
@@ -1634,7 +1629,6 @@ def fetch_handle_failure_context(
16341629

16351630
ti.state = State.UP_FOR_RETRY
16361631
email_for_state = operator.attrgetter("email_on_retry")
1637-
callbacks = task.on_retry_callback if task else None
16381632

16391633
try:
16401634
get_listener_manager().hook.on_task_instance_failed(
@@ -1647,7 +1641,6 @@ def fetch_handle_failure_context(
16471641
"ti": ti,
16481642
"email_for_state": email_for_state,
16491643
"task": task,
1650-
"callbacks": callbacks,
16511644
"context": context,
16521645
}
16531646

airflow-core/src/airflow/serialization/serialized_objects.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@
109109
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
110110
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
111111
from airflow.triggers.base import BaseEventTrigger
112-
from airflow.typing_compat import Self
113112

114113
HAS_KUBERNETES: bool
115114
try:
@@ -1309,7 +1308,7 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
13091308
link = self.operator_extra_link_dict.get(name) or self.global_operator_extra_link_dict.get(name)
13101309
if not link:
13111310
return None
1312-
return link.get_link(self.unmap(None), ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives SerializedBaseOperator
1311+
return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives SerializedBaseOperator
13131312

13141313
@property
13151314
def task_type(self) -> str:
@@ -1758,9 +1757,6 @@ def expand_start_from_trigger(self, *, context: Context) -> bool:
17581757
def get_serialized_fields(self):
17591758
return BaseOperator.get_serialized_fields()
17601759

1761-
def unmap(self, resolve: None) -> Self:
1762-
return self
1763-
17641760
def _iter_all_mapped_downstreams(self) -> Iterator[MappedOperator | MappedTaskGroup]:
17651761
"""
17661762
Return mapped nodes that are direct dependencies of the current task.

airflow-core/tests/unit/serialization/test_dag_serialization.py

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
6363
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
6464
from airflow.providers.standard.operators.bash import BashOperator
65-
from airflow.providers.standard.sensors.bash import BashSensor
6665
from airflow.sdk import AssetAlias, BaseHook, teardown
6766
from airflow.sdk.bases.decorator import DecoratedOperator
6867
from airflow.sdk.bases.operator import BaseOperator
@@ -2718,44 +2717,6 @@ def test_operator_expand_kwargs_xcomarg_serde(strict):
27182717
assert xcom_arg.operator is serialized_dag.task_dict["op1"]
27192718

27202719

2721-
def test_operator_expand_deserialized_unmap():
2722-
"""Unmap a deserialized mapped operator should be similar to deserializing an non-mapped operator."""
2723-
normal = BashOperator(task_id="a", bash_command=[1, 2], executor_config={"a": "b"})
2724-
mapped = BashOperator.partial(task_id="a", executor_config={"a": "b"}).expand(bash_command=[1, 2])
2725-
2726-
ser_mapped = BaseSerialization.serialize(mapped)
2727-
deser_mapped = BaseSerialization.deserialize(ser_mapped)
2728-
deser_mapped.dag = None
2729-
2730-
ser_normal = BaseSerialization.serialize(normal)
2731-
deser_normal = BaseSerialization.deserialize(ser_normal)
2732-
deser_normal.dag = None
2733-
unmapped_deser_mapped = deser_mapped.unmap(None)
2734-
2735-
assert type(unmapped_deser_mapped) is type(deser_normal) is SerializedBaseOperator
2736-
assert unmapped_deser_mapped.task_id == deser_normal.task_id == "a"
2737-
assert unmapped_deser_mapped.executor_config == deser_normal.executor_config == {"a": "b"}
2738-
2739-
2740-
@pytest.mark.db_test
2741-
def test_sensor_expand_deserialized_unmap():
2742-
"""Unmap a deserialized mapped sensor should be similar to deserializing a non-mapped sensor"""
2743-
dag = DAG(dag_id="hello", schedule=None, start_date=None)
2744-
with dag:
2745-
normal = BashSensor(task_id="a", bash_command=[1, 2], mode="reschedule")
2746-
mapped = BashSensor.partial(task_id="b", mode="reschedule").expand(bash_command=[1, 2])
2747-
ser_mapped = SerializedBaseOperator.serialize(mapped)
2748-
deser_mapped = SerializedBaseOperator.deserialize(ser_mapped)
2749-
deser_mapped.dag = dag
2750-
deser_unmapped = deser_mapped.unmap(None)
2751-
ser_normal = SerializedBaseOperator.serialize(normal)
2752-
deser_normal = SerializedBaseOperator.deserialize(ser_normal)
2753-
comps = set(BashSensor._comps)
2754-
comps.remove("task_id")
2755-
comps.remove("dag_id")
2756-
assert all(getattr(deser_unmapped, c, None) == getattr(deser_normal, c, None) for c in comps)
2757-
2758-
27592720
def test_task_resources_serde():
27602721
"""
27612722
Test task resources serialization/deserialization.

airflow-core/tests/unit/serialization/test_serialized_objects.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -494,29 +494,6 @@ def test_backcompat_deserialize_connection(conn_uri):
494494
assert deserialized.get_uri() == conn_uri
495495

496496

497-
@pytest.mark.db_test
498-
def test_serialized_mapped_operator_unmap(dag_maker):
499-
from airflow.serialization.serialized_objects import SerializedDAG
500-
501-
from tests_common.test_utils.mock_operators import MockOperator
502-
503-
with dag_maker(dag_id="dag") as dag:
504-
MockOperator(task_id="task1", arg1="x")
505-
MockOperator.partial(task_id="task2").expand(arg1=["a", "b"])
506-
507-
serialized_dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
508-
assert serialized_dag.dag_id == "dag"
509-
510-
serialized_task1 = serialized_dag.get_task("task1")
511-
assert serialized_task1.dag is serialized_dag
512-
513-
serialized_task2 = serialized_dag.get_task("task2")
514-
assert serialized_task2.dag is serialized_dag
515-
516-
serialized_unmapped_task = serialized_task2.unmap(None)
517-
assert serialized_unmapped_task.dag is serialized_dag
518-
519-
520497
def test_ser_of_asset_event_accessor():
521498
# todo: (Airflow 3.0) we should force reserialization on upgrade
522499
d = OutletEventAccessors()

0 commit comments

Comments
 (0)