Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions airflow-core/src/airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,30 +440,6 @@ def expand_start_from_trigger(self, *, context: Context) -> bool:
def expand_start_trigger_args(self, *, context: Context) -> StartTriggerArgs | None:
raise NotImplementedError

def unmap(self, resolve: None) -> SerializedBaseOperator:
"""
Get the "normal" Operator after applying the current mapping.

The *resolve* argument is never used and should always be *None*. It
exists only to match the signature of the non-serialized implementation.

The return value is a SerializedBaseOperator that "looks like" the
actual unmapping result.

:meta private:
"""
# After a mapped operator is serialized, there's no real way to actually
# unmap it since we've lost access to the underlying operator class.
# This tries its best to simply "forward" all the attributes on this
# mapped operator to a new SerializedBaseOperator instance.
sop = SerializedBaseOperator(task_id=self.task_id, params=self.params, _airflow_from_mapped=True)
for partial_attr, value in self.partial_kwargs.items():
setattr(sop, partial_attr, value)
SerializedBaseOperator.populate_operator(sop, self.operator_class)
if self.dag is not None: # For Mypy; we only serialize tasks in a DAG so the check always satisfies.
SerializedBaseOperator.set_task_dag_references(sop, self.dag)
return sop


@functools.singledispatch
def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> int:
Expand Down
13 changes: 3 additions & 10 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1610,18 +1610,13 @@ def fetch_handle_failure_context(
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries ... or if force_fail is truthy

task: SerializedBaseOperator | None = None
try:
if (orig_task := getattr(ti, "task", None)) and context:
# TODO (GH-52141): Move runtime unmap into task runner.
task = orig_task.unmap((context, session))
except Exception:
cls.logger().error("Unable to unmap task to determine if we need to send an alert email")
# Use the original task directly - scheduler only needs to check email settings
# Actual callbacks are handled by the DAG processor, not the scheduler
task = getattr(ti, "task", None)

if not ti.is_eligible_to_retry():
ti.state = TaskInstanceState.FAILED
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and fail_fast:
_stop_remaining_tasks(task_instance=ti, session=session)
Expand All @@ -1634,7 +1629,6 @@ def fetch_handle_failure_context(

ti.state = State.UP_FOR_RETRY
email_for_state = operator.attrgetter("email_on_retry")
callbacks = task.on_retry_callback if task else None

try:
get_listener_manager().hook.on_task_instance_failed(
Expand All @@ -1647,7 +1641,6 @@ def fetch_handle_failure_context(
"ti": ti,
"email_for_state": email_for_state,
"task": task,
"callbacks": callbacks,
"context": context,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.triggers.base import BaseEventTrigger
from airflow.typing_compat import Self

HAS_KUBERNETES: bool
try:
Expand Down Expand Up @@ -1309,7 +1308,7 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
link = self.operator_extra_link_dict.get(name) or self.global_operator_extra_link_dict.get(name)
if not link:
return None
return link.get_link(self.unmap(None), ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives SerializedBaseOperator
return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives SerializedBaseOperator
Comment on lines -1312 to +1311
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I wonder if this should be kept. Would users expect get_link to be called against a MappedOperator? It may not have the same attributes as the underlying operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

models.MappedOperator also defines def get_extra_links -- so either is already broken or there should be no change with this on it 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
"""
For an operator, gets the URLs that the ``extra_links`` entry points to.
:meta private:
:raise ValueError: The error message of a ValueError will be passed on through to
the fronted to show up as a tooltip on the disabled link.
:param ti: The TaskInstance for the URL being searched for.
:param name: The name of the link we're looking for the URL for. Should be
one of the options specified in ``extra_links``.
"""
link = self.operator_extra_link_dict.get(name) or self.global_operator_extra_link_dict.get(name)
if not link:
return None
return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives MappedOperator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With 3.0.0 get links aren't called in the scheduler or really the webserver anymore - we get the link in the execution side and store it in the xcom as an XComExtraLink (or something like that)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this only gets called for plugin registered global links.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think this was from a series of incorrect changes… get_link used to only accept BaseOperator before this change
#46613

You can see get_extra_links always calls unmap to get a BaseOperator.

After the PR above, get_extra_links was then incorrectly “restored” to pass in MappedOperator in #50238. This PR has not been a release yet.

I think removing unmap here is therefore wrong. It should be kept for get_extra_links for compatibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr #46613 is released in 3.0.0 and #50238 in 3.0.1

get_link used to only accept BaseOperator before this change

How would that work with CustomOperators though! We don't serialize all attributes so extra_links with operator as argument will have limited attributes to work with for global op links

Copy link
Member

@uranusjr uranusjr Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the thing was original designed we could get the original operator class from a dag bag… but yeah I guess all these no longer apply now.

Now that we always do get_link in the worker (#54816 (comment)), we can always get the original operator class (you need to unmap for execution anyway), so I guess what we need to do is

  1. This PR is OK, we don’t need unmap at scheduler side
  2. Not have extra_links and get_extra_links on SerializedBaseOperator? (since nobody should access these in the scheduler and webserver; the XCom mnechanism should be used instead)
  3. Make sure global links are also called and generated on execution time
  4. Restore get_link to only be expect BaseOperator subclasses; may need to tweak execution slightly to make sure it’s only called after a MappedOperator is unmapped into a BaseOperator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One extra wrinkle here @uranusjr is that plugon-based global operator links would still possible work in the Webserver.


@property
def task_type(self) -> str:
Expand Down Expand Up @@ -1758,9 +1757,6 @@ def expand_start_from_trigger(self, *, context: Context) -> bool:
def get_serialized_fields(self):
return BaseOperator.get_serialized_fields()

def unmap(self, resolve: None) -> Self:
return self

def _iter_all_mapped_downstreams(self) -> Iterator[MappedOperator | MappedTaskGroup]:
"""
Return mapped nodes that are direct dependencies of the current task.
Expand Down
39 changes: 0 additions & 39 deletions airflow-core/tests/unit/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
from airflow.models.xcom import XCOM_RETURN_KEY, XComModel
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.sdk import AssetAlias, BaseHook, teardown
from airflow.sdk.bases.decorator import DecoratedOperator
from airflow.sdk.bases.operator import BaseOperator
Expand Down Expand Up @@ -2718,44 +2717,6 @@ def test_operator_expand_kwargs_xcomarg_serde(strict):
assert xcom_arg.operator is serialized_dag.task_dict["op1"]


def test_operator_expand_deserialized_unmap():
"""Unmap a deserialized mapped operator should be similar to deserializing an non-mapped operator."""
normal = BashOperator(task_id="a", bash_command=[1, 2], executor_config={"a": "b"})
mapped = BashOperator.partial(task_id="a", executor_config={"a": "b"}).expand(bash_command=[1, 2])

ser_mapped = BaseSerialization.serialize(mapped)
deser_mapped = BaseSerialization.deserialize(ser_mapped)
deser_mapped.dag = None

ser_normal = BaseSerialization.serialize(normal)
deser_normal = BaseSerialization.deserialize(ser_normal)
deser_normal.dag = None
unmapped_deser_mapped = deser_mapped.unmap(None)

assert type(unmapped_deser_mapped) is type(deser_normal) is SerializedBaseOperator
assert unmapped_deser_mapped.task_id == deser_normal.task_id == "a"
assert unmapped_deser_mapped.executor_config == deser_normal.executor_config == {"a": "b"}


@pytest.mark.db_test
def test_sensor_expand_deserialized_unmap():
"""Unmap a deserialized mapped sensor should be similar to deserializing a non-mapped sensor"""
dag = DAG(dag_id="hello", schedule=None, start_date=None)
with dag:
normal = BashSensor(task_id="a", bash_command=[1, 2], mode="reschedule")
mapped = BashSensor.partial(task_id="b", mode="reschedule").expand(bash_command=[1, 2])
ser_mapped = SerializedBaseOperator.serialize(mapped)
deser_mapped = SerializedBaseOperator.deserialize(ser_mapped)
deser_mapped.dag = dag
deser_unmapped = deser_mapped.unmap(None)
ser_normal = SerializedBaseOperator.serialize(normal)
deser_normal = SerializedBaseOperator.deserialize(ser_normal)
comps = set(BashSensor._comps)
comps.remove("task_id")
comps.remove("dag_id")
assert all(getattr(deser_unmapped, c, None) == getattr(deser_normal, c, None) for c in comps)


def test_task_resources_serde():
"""
Test task resources serialization/deserialization.
Expand Down
23 changes: 0 additions & 23 deletions airflow-core/tests/unit/serialization/test_serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,29 +494,6 @@ def test_backcompat_deserialize_connection(conn_uri):
assert deserialized.get_uri() == conn_uri


@pytest.mark.db_test
def test_serialized_mapped_operator_unmap(dag_maker):
from airflow.serialization.serialized_objects import SerializedDAG

from tests_common.test_utils.mock_operators import MockOperator

with dag_maker(dag_id="dag") as dag:
MockOperator(task_id="task1", arg1="x")
MockOperator.partial(task_id="task2").expand(arg1=["a", "b"])

serialized_dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
assert serialized_dag.dag_id == "dag"

serialized_task1 = serialized_dag.get_task("task1")
assert serialized_task1.dag is serialized_dag

serialized_task2 = serialized_dag.get_task("task2")
assert serialized_task2.dag is serialized_dag

serialized_unmapped_task = serialized_task2.unmap(None)
assert serialized_unmapped_task.dag is serialized_dag


def test_ser_of_asset_event_accessor():
# todo: (Airflow 3.0) we should force reserialization on upgrade
d = OutletEventAccessors()
Expand Down
Loading