Skip to content

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Aug 22, 2025

The scheduler-side MappedOperator and SerializedBaseOperator classes contained unmap() functionality that was creating unnecessary complexity and architectural confusion. The unmap() method was attempting to synthesize "real" operators from serialized data, but this is not needed for scheduler operations.

Scheduler doesn't execute callbacks - that's handled by the DAG processor. So simplifying the codebase.

This would also be helpful for #54569

Remove unnecessary unmap() functionality from server-side operators to
simplify scheduler architecture and eliminate synthetic operator creation.

Key changes:
- Remove unmap() method from MappedOperator class
- Update TaskInstance.fetch_handle_failure_context() to use original task directly
- Remove unmap() call from SerializedBaseOperator.get_extra_links()
- Update related tests to verify serialization without unmap functionality

The scheduler no longer needs to 'unmap' operators since:
- Callbacks are handled by DAG processor, not scheduler
- Email settings and fail-fast logic work with original task
- Extra links work consistently between regular and mapped operators

This eliminates the TODO comment about moving runtime unmap to task runner
and provides cleaner separation between scheduler and execution concerns.

Includes pre-commit formatting fixes applied by ruff and ruff-format.
Comment on lines -1312 to +1311
return link.get_link(self.unmap(None), ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives SerializedBaseOperator
return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives SerializedBaseOperator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I wonder if this should be kept. Would users expect get_link to be called against a MappedOperator? It may not have the same attributes as the underlying operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

models.MappedOperator also defines def get_extra_links -- so either is already broken or there should be no change with this on it 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def get_extra_links(self, ti: TaskInstance, name: str) -> str | None:
"""
For an operator, gets the URLs that the ``extra_links`` entry points to.
:meta private:
:raise ValueError: The error message of a ValueError will be passed on through to
the fronted to show up as a tooltip on the disabled link.
:param ti: The TaskInstance for the URL being searched for.
:param name: The name of the link we're looking for the URL for. Should be
one of the options specified in ``extra_links``.
"""
link = self.operator_extra_link_dict.get(name) or self.global_operator_extra_link_dict.get(name)
if not link:
return None
return link.get_link(self, ti_key=ti.key) # type: ignore[arg-type] # TODO: GH-52141 - BaseOperatorLink.get_link expects BaseOperator but receives MappedOperator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With 3.0.0 get links aren't called in the scheduler or really the webserver anymore - we get the link in the execution side and store it in the xcom as an XComExtraLink (or something like that)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this only gets called for plugin registered global links.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think this was from a series of incorrect changes… get_link used to only accept BaseOperator before this change
#46613

You can see get_extra_links always calls unmap to get a BaseOperator.

After the PR above, get_extra_links was then incorrectly “restored” to pass in MappedOperator in #50238. This PR has not been a release yet.

I think removing unmap here is therefore wrong. It should be kept for get_extra_links for compatibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr #46613 is released in 3.0.0 and #50238 in 3.0.1

get_link used to only accept BaseOperator before this change

How would that work with CustomOperators though! We don't serialize all attributes so extra_links with operator as argument will have limited attributes to work with for global op links

Copy link
Member

@uranusjr uranusjr Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the thing was original designed we could get the original operator class from a dag bag… but yeah I guess all these no longer apply now.

Now that we always do get_link in the worker (#54816 (comment)), we can always get the original operator class (you need to unmap for execution anyway), so I guess what we need to do is

  1. This PR is OK, we don’t need unmap at scheduler side
  2. Not have extra_links and get_extra_links on SerializedBaseOperator? (since nobody should access these in the scheduler and webserver; the XCom mnechanism should be used instead)
  3. Make sure global links are also called and generated on execution time
  4. Restore get_link to only be expect BaseOperator subclasses; may need to tweak execution slightly to make sure it’s only called after a MappedOperator is unmapped into a BaseOperator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One extra wrinkle here @uranusjr is that plugon-based global operator links would still possible work in the Webserver.

@kaxil kaxil merged commit cfce573 into apache:main Aug 23, 2025
57 checks passed
@kaxil kaxil deleted the remove-unmap-from-scheduler branch August 23, 2025 22:49
mangal-vairalkar pushed a commit to mangal-vairalkar/airflow that referenced this pull request Aug 30, 2025
Remove unnecessary unmap() functionality from server-side operators to
simplify scheduler architecture and eliminate synthetic operator creation.

Key changes:
- Remove unmap() method from MappedOperator class
- Update TaskInstance.fetch_handle_failure_context() to use original task directly
- Remove unmap() call from SerializedBaseOperator.get_extra_links()
- Update related tests to verify serialization without unmap functionality

The scheduler no longer needs to 'unmap' operators since:
- Callbacks are handled by DAG processor, not scheduler
- Email settings and fail-fast logic work with original task
- Extra links work consistently between regular and mapped operators

This eliminates the TODO comment about moving runtime unmap to task runner
and provides cleaner separation between scheduler and execution concerns.

Includes pre-commit formatting fixes applied by ruff and ruff-format.
nothingmin pushed a commit to nothingmin/airflow that referenced this pull request Sep 2, 2025
Remove unnecessary unmap() functionality from server-side operators to
simplify scheduler architecture and eliminate synthetic operator creation.

Key changes:
- Remove unmap() method from MappedOperator class
- Update TaskInstance.fetch_handle_failure_context() to use original task directly
- Remove unmap() call from SerializedBaseOperator.get_extra_links()
- Update related tests to verify serialization without unmap functionality

The scheduler no longer needs to 'unmap' operators since:
- Callbacks are handled by DAG processor, not scheduler
- Email settings and fail-fast logic work with original task
- Extra links work consistently between regular and mapped operators

This eliminates the TODO comment about moving runtime unmap to task runner
and provides cleaner separation between scheduler and execution concerns.

Includes pre-commit formatting fixes applied by ruff and ruff-format.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants