From b54646bf01eb26233e89803775dc7b06434382c3 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 8 Jul 2025 00:41:14 +0530 Subject: [PATCH 01/11] [v3-0-test] Fix DagRun list (#52986) (#52989) (cherry picked from commit 64d87e56350cb4806171f6a831eb1c6d254dd472) Co-authored-by: Pierre Jeambrun --- airflow-core/src/airflow/ui/src/pages/DagRuns.tsx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index b3faf701959e7..831727105bafd 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -22,7 +22,6 @@ import { Flex, HStack, Link, type SelectValueChangeDetails, Text } from "@chakra import type { ColumnDef } from "@tanstack/react-table"; import { useCallback } from "react"; import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; -import { useLocalStorage } from "usehooks-ts"; import { useDagRunServiceGetDagRuns } from "openapi/queries"; import type { DAGRunResponse, DagRunState, DagRunType } from "openapi/requests/types.gen"; @@ -148,13 +147,12 @@ export const DagRuns = () => { const endDate = searchParams.get(END_DATE_PARAM); const refetchInterval = useAutoRefresh({}); - const [limit] = useLocalStorage(`dag_runs_limit-${dagId}`, 10); const { data, error, isLoading } = useDagRunServiceGetDagRuns( { dagId: dagId ?? "~", endDateLte: endDate ?? undefined, - limit, + limit: pagination.pageSize, offset: pagination.pageIndex * pagination.pageSize, orderBy, runType: filteredType === null ? undefined : [filteredType], From 45d04371ff3d4d96ded92e529555969f657bd787 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 8 Jul 2025 11:07:27 +0200 Subject: [PATCH 02/11] [v3-0-test] fix: escape $ in airflow-init to properly set AIRFLOW_UID (#52988) (#53007) (cherry picked from commit a30bd70ba45b1ebbecf34996af76bcb18ba17daf) Co-authored-by: Jun Jeong <121210928+givemechocopy@users.noreply.github.com> --- airflow-core/docs/howto/docker-compose/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/docs/howto/docker-compose/docker-compose.yaml b/airflow-core/docs/howto/docker-compose/docker-compose.yaml index 17f2e93bf144b..024f39decf44e 100644 --- a/airflow-core/docs/howto/docker-compose/docker-compose.yaml +++ b/airflow-core/docs/howto/docker-compose/docker-compose.yaml @@ -217,7 +217,7 @@ services: echo "For other operating systems you can get rid of the warning with manually created .env file:" echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" echo - export AIRFLOW_UID=$(id -u) + export AIRFLOW_UID=$$(id -u) fi one_meg=1048576 mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) From a2fe830ec457c405ca4703d2d02177b4f00748ed Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 8 Jul 2025 11:33:51 +0200 Subject: [PATCH 03/11] [v3-0-test] Fix db downgrade check condition (#52982) (#53005) * fix rollback from airflow3 * use fab for test * amend * fix test * revert condition fix and add shortcut if revisions are same * updating revision to 2.10.3 in condition * remove fab from tests (cherry picked from commit d4585543477835e0c33aadbd64789d60e787390e) Co-authored-by: Rahul Vats <43964496+vatsrahul1001@users.noreply.github.com> --- airflow-core/src/airflow/utils/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 4ab7226fcb487..f091f0868b0d5 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -1210,7 +1210,7 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: log.info("Attempting downgrade to revision %s", to_revision) config = _get_alembic_config() # Check if downgrade is less than 3.0.0 and requires that `ab_user` fab table is present - if _revision_greater(config, _REVISION_HEADS_MAP["3.0.0"], to_revision): + if _revision_greater(config, _REVISION_HEADS_MAP["2.10.3"], to_revision): unitest_mode = conf.getboolean("core", "unit_test_mode") if unitest_mode: try: From 58d389a74f3e5debc205734d81a8bebda232a10e Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 8 Jul 2025 12:20:44 +0200 Subject: [PATCH 04/11] docs: refresh Public Interface & align how-to guides for Airflow 3.0+ (#52297) (#53011) * docs: Update public interface documentation for Airflow 3.0+ for metadata direct access change * docs: replace direct metadata model imports in how-to examples with airflow.sdk * Make Public interface for Airflow 3 and add link for Airflow 2.11 * Fix PR comments * Update airflow-core/docs/public-airflow-interface.rst * Fix PR comments * Remove duplicates and remove RuntimeTaskInstanceProtocol as it is not public * Fix PR comments --------- (cherry picked from commit e8767b69f5ea6e3afc0419ef5dd1ba911046327b) Co-authored-by: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Co-authored-by: Amogh Desai --- airflow-core/docs/conf.py | 10 - airflow-core/docs/core-concepts/params.rst | 2 +- airflow-core/docs/core-concepts/variables.rst | 14 + airflow-core/docs/core-concepts/xcoms.rst | 9 +- airflow-core/docs/howto/connection.rst | 6 +- airflow-core/docs/howto/custom-operator.rst | 2 +- .../docs/public-airflow-interface.rst | 324 +++++++++++++----- 7 files changed, 266 insertions(+), 101 deletions(-) diff --git a/airflow-core/docs/conf.py b/airflow-core/docs/conf.py index 402d7db723089..545c95e712168 100644 --- a/airflow-core/docs/conf.py +++ b/airflow-core/docs/conf.py @@ -120,7 +120,6 @@ PACKAGES_THAT_WE_SHOULD_ADD_TO_API_DOCS = { "hooks", - "decorators", "example_dags", "executors", "operators", @@ -140,15 +139,6 @@ MODELS_THAT_SHOULD_BE_INCLUDED_IN_API_DOCS: set[str] = { "baseoperator.py", - "connection.py", - "dag.py", - "dagrun.py", - "dagbag.py", - "param.py", - "taskinstance.py", - "taskinstancekey.py", - "variable.py", - "xcom.py", } diff --git a/airflow-core/docs/core-concepts/params.rst b/airflow-core/docs/core-concepts/params.rst index 805da212d352a..f6d8a2c5c7a89 100644 --- a/airflow-core/docs/core-concepts/params.rst +++ b/airflow-core/docs/core-concepts/params.rst @@ -32,7 +32,7 @@ If the user-supplied values don't pass validation, Airflow shows a warning inste DAG-level Params ---------------- -To add Params to a :class:`~airflow.models.dag.DAG`, initialize it with the ``params`` kwarg. +To add Params to a :class:`~airflow.sdk.DAG`, initialize it with the ``params`` kwarg. Use a dictionary that maps Param names to either a :class:`~airflow.sdk.definitions.param.Param` or an object indicating the parameter's default value. .. code-block:: diff --git a/airflow-core/docs/core-concepts/variables.rst b/airflow-core/docs/core-concepts/variables.rst index db0ffacb0884f..6487fd0c131a3 100644 --- a/airflow-core/docs/core-concepts/variables.rst +++ b/airflow-core/docs/core-concepts/variables.rst @@ -33,6 +33,20 @@ To use them, just import and call ``get`` on the Variable model:: # Returns the value of default (None) if the variable is not set baz = Variable.get("baz", default=None) +You can also access variables through the Task Context using +:func:`~airflow.sdk.get_current_context`: + +.. code-block:: python + + from airflow.sdk import get_current_context + + + def my_task(): + context = get_current_context() + var = context["var"] + my_variable = var.get("my_variable_name") + return my_variable + You can also use them from :ref:`templates `:: # Raw value diff --git a/airflow-core/docs/core-concepts/xcoms.rst b/airflow-core/docs/core-concepts/xcoms.rst index 2be9b75bbf849..93463a752768e 100644 --- a/airflow-core/docs/core-concepts/xcoms.rst +++ b/airflow-core/docs/core-concepts/xcoms.rst @@ -25,6 +25,9 @@ XComs (short for "cross-communications") are a mechanism that let :doc:`tasks` t An XCom is identified by a ``key`` (essentially its name), as well as the ``task_id`` and ``dag_id`` it came from. They can have any serializable value (including objects that are decorated with ``@dataclass`` or ``@attr.define``, see :ref:`TaskFlow arguments `:), but they are only designed for small amounts of data; do not use them to pass around large values, like dataframes. +XCom operations should be performed through the Task Context using +:func:`~airflow.sdk.get_current_context`. Directly updating using XCom database model is not possible. + XComs are explicitly "pushed" and "pulled" to/from their storage using the ``xcom_push`` and ``xcom_pull`` methods on Task Instances. To push a value within a task called **"task-1"** that will be used by another task: @@ -73,8 +76,6 @@ An example of pushing multiple XComs and pulling them individually: # Pulling entire xcom data from push_multiple task data = context["ti"].xcom_pull(task_ids="push_multiple", key="return_value") - - .. note:: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent. @@ -91,7 +92,7 @@ Custom XCom Backends The XCom system has interchangeable backends, and you can set which backend is being used via the ``xcom_backend`` configuration option. -If you want to implement your own backend, you should subclass :class:`~airflow.models.xcom.BaseXCom`, and override the ``serialize_value`` and ``deserialize_value`` methods. +If you want to implement your own backend, you should subclass :class:`~airflow.sdk.bases.xcom.BaseXCom`, and override the ``serialize_value`` and ``deserialize_value`` methods. You can override the ``purge`` method in the ``BaseXCom`` class to have control over purging the xcom data from the custom backend. This will be called as part of ``delete``. @@ -104,6 +105,6 @@ If you can exec into a terminal in an Airflow container, you can then print out .. code-block:: python - from airflow.models.xcom import XCom + from airflow.sdk.execution_time.xcom import XCom print(XCom.__name__) diff --git a/airflow-core/docs/howto/connection.rst b/airflow-core/docs/howto/connection.rst index 84aa1648b8224..e58d0260db49b 100644 --- a/airflow-core/docs/howto/connection.rst +++ b/airflow-core/docs/howto/connection.rst @@ -22,7 +22,7 @@ Managing Connections For an overview of hooks and connections, see :doc:`/authoring-and-scheduling/connections`. -Airflow's :class:`~airflow.models.connection.Connection` object is used for storing credentials and other information necessary for connecting to external services. +Airflow's :class:`~airflow.sdk.Connection` object is used for storing credentials and other information necessary for connecting to external services. Connections may be defined in the following ways: @@ -77,7 +77,7 @@ convenience property :py:meth:`~airflow.models.connection.Connection.as_json`. I .. code-block:: pycon - >>> from airflow.models.connection import Connection + >>> from airflow.sdk import Connection >>> c = Connection( ... conn_id="some_conn", ... conn_type="mysql", @@ -94,7 +94,7 @@ In addition, same approach could be used to convert Connection from URI format t .. code-block:: pycon - >>> from airflow.models.connection import Connection + >>> from airflow.sdk import Connection >>> c = Connection( ... conn_id="awesome_conn", ... description="Example Connection", diff --git a/airflow-core/docs/howto/custom-operator.rst b/airflow-core/docs/howto/custom-operator.rst index b76a2277fbfea..d6206166e1211 100644 --- a/airflow-core/docs/howto/custom-operator.rst +++ b/airflow-core/docs/howto/custom-operator.rst @@ -24,7 +24,7 @@ Creating a custom Operator Airflow allows you to create new operators to suit the requirements of you or your team. This extensibility is one of the many features which make Apache Airflow powerful. -You can create any operator you want by extending the :class:`airflow.models.baseoperator.BaseOperator` +You can create any operator you want by extending the public SDK base class :class:`~airflow.sdk.BaseOperator`. There are two methods that you need to override in a derived class: diff --git a/airflow-core/docs/public-airflow-interface.rst b/airflow-core/docs/public-airflow-interface.rst index aa5e3b5dc1bee..b0b7cfe5af23a 100644 --- a/airflow-core/docs/public-airflow-interface.rst +++ b/airflow-core/docs/public-airflow-interface.rst @@ -15,6 +15,17 @@ specific language governing permissions and limitations under the License. +**PUBLIC INTERFACE FOR AIRFLOW 3.0+** +===================================== + +.. warning:: + + **This documentation covers the Public Interface for Airflow 3.0+** + + If you are using Airflow 2.x, please refer to the + `Airflow 2.11 Public Interface Documentation `_ + for the legacy interface. + Public Interface of Airflow ........................... @@ -25,6 +36,14 @@ and extending Airflow capabilities by writing new executors, plugins, operators Public Interface can be useful for building custom tools and integrations with other systems, and for automating certain aspects of the Airflow workflow. +The primary public interface for DAG Authors and task execution is using task SDK +Airflow task SDK is the primary public interface for DAG Authors and for task execution +:doc:`airflow.sdk namespace `. Direct access to the metadata database +from task code is no longer allowed. Instead, use the :doc:`Stable REST API `, +`Python Client `_, or Task Context methods. + +For comprehensive Task SDK documentation, see the `Task SDK Reference `_. + Using Airflow Public Interfaces =============================== @@ -56,13 +75,65 @@ way, the Stable REST API is recommended. Using the Public Interface for DAG Authors ========================================== +The primary interface for DAG Authors is the :doc:`airflow.sdk namespace `. +This provides a stable, well-defined interface for creating DAGs and tasks that is not subject to internal +implementation changes. The goal of this change is to decouple DAG authoring from Airflow internals (Scheduler, +API Server, etc.), providing a version-agnostic, stable interface for writing and maintaining DAGs across Airflow versions. + +**Key Imports from airflow.sdk:** + +**Classes:** + +* ``Asset`` +* ``BaseHook`` +* ``BaseNotifier`` +* ``BaseOperator`` +* ``BaseOperatorLink`` +* ``BaseSensorOperator`` +* ``Connection`` +* ``Context`` +* ``DAG`` +* ``EdgeModifier`` +* ``Label`` +* ``ObjectStoragePath`` +* ``Param`` +* ``TaskGroup`` +* ``Variable`` + +**Decorators and Functions:** + +* ``@asset`` +* ``@dag`` +* ``@setup`` +* ``@task`` +* ``@task_group`` +* ``@teardown`` +* ``chain`` +* ``chain_linear`` +* ``cross_downstream`` +* ``get_current_context`` +* ``get_parsing_context`` + +**Migration from Airflow 2.x:** + +For detailed migration instructions from Airflow 2.x to 3.x, including import changes and other breaking changes, +see the :doc:`Migration Guide `. + +For an exhaustive list of available classes, decorators, and functions, check ``airflow.sdk.__all__``. + +All DAGs should update imports to use ``airflow.sdk`` instead of referencing internal Airflow modules directly. +Legacy import paths (e.g., ``airflow.models.dag.DAG``, ``airflow.decorator.task``) are deprecated and will be +removed in a future Airflow version. + Dags ----- +==== The DAG is Airflow's core entity that represents a recurring workflow. You can create a DAG by -instantiating the :class:`~airflow.models.dag.DAG` class in your DAG file. You can also instantiate -them via :class:`~airflow.models.dagbag.DagBag` class that reads dags from a file or a folder. Dags -can also have parameters specified via :class:`~airflow.sdk.definitions.param.Param` class. +instantiating the :class:`~airflow.sdk.DAG` class in your DAG file. Dags can also have parameters +specified via :class:`~airflow.sdk.Param` class. + +The recommended way to create DAGs is using the :func:`~airflow.sdk.dag` decorator +from the airflow.sdk namespace. Airflow has a set of example dags that you can use to learn how to write dags @@ -77,69 +148,86 @@ You can read more about dags in :doc:`Dags `. References for the modules used in dags are here: -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 +.. note:: + The airflow.sdk namespace provides the primary interface for DAG Authors. + For detailed API documentation, see the `Task SDK Reference `_. - _api/airflow/models/dag/index - _api/airflow/models/dagbag/index - _api/airflow/models/param/index +.. note:: + The :class:`~airflow.models.dagbag.DagBag` class is used internally by Airflow for loading DAGs + from files and folders. DAG Authors should use the :class:`~airflow.sdk.DAG` class from the + airflow.sdk namespace instead. -Properties of a :class:`~airflow.models.dagrun.DagRun` can also be referenced in things like :ref:`Templates `. - -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/dagrun/index +.. note:: + The :class:`~airflow.models.dagrun.DagRun` class is used internally by Airflow for DAG run + management. DAG Authors should access DAG run information through the Task Context via + :func:`~airflow.sdk.get_current_context` or use the :class:`~airflow.sdk.types.DagRunProtocol` + interface. .. _pythonapi:operators: Operators ---------- +========= + +The base classes :class:`~airflow.sdk.BaseOperator` and :class:`~airflow.sdk.BaseSensorOperator` are public and may be extended to make new operators. -The base classes :class:`~airflow.models.baseoperator.BaseOperator` and :class:`~airflow.sensors.base.BaseSensorOperator` are public and may be extended to make new operators. +The base class for new operators is :class:`~airflow.sdk.BaseOperator` +from the airflow.sdk namespace. Subclasses of BaseOperator which are published in Apache Airflow are public in *behavior* but not in *structure*. That is to say, the Operator's parameters and behavior is governed by semver but the methods are subject to change at any time. Task Instances --------------- +============== -Task instances are the individual runs of a single task in a DAG (in a DAG Run). They are available in the context -passed to the execute method of the operators via the :class:`~airflow.models.taskinstance.TaskInstance` class. - -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/taskinstance/index +Task instances are the individual runs of a single task in a DAG (in a DAG Run). Task instances are accessed through +the Task Context via :func:`~airflow.sdk.get_current_context`. Direct database access is not possible. +.. note:: + Task Context is part of the airflow.sdk namespace. + For detailed API documentation, see the `Task SDK Reference `_. Task Instance Keys ------------------- +================== Task instance keys are unique identifiers of task instances in a DAG (in a DAG Run). A key is a tuple that consists of -``dag_id``, ``task_id``, ``run_id``, ``try_number``, and ``map_index``. The key of a task instance can be retrieved via -:meth:`~airflow.models.taskinstance.TaskInstance.key`. +``dag_id``, ``task_id``, ``run_id``, ``try_number``, and ``map_index``. -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 +Direct access to task instance keys via the :class:`~airflow.models.taskinstance.TaskInstance` +model is no longer allowed from task code. Instead, use the Task Context via :func:`~airflow.sdk.get_current_context` +to access task instance information. + +Example of accessing task instance information through Task Context: + +.. code-block:: python + + from airflow.sdk import get_current_context + + + def my_task(): + context = get_current_context() + ti = context["ti"] + + dag_id = ti.dag_id + task_id = ti.task_id + run_id = ti.run_id + try_number = ti.try_number + map_index = ti.map_index + + print(f"Task: {dag_id}.{task_id}, Run: {run_id}, Try: {try_number}, Map Index: {map_index}") + +.. note:: + The :class:`~airflow.models.taskinstancekey.TaskInstanceKey` class is used internally by Airflow + for identifying task instances. DAG Authors should access task instance information through the + Task Context via :func:`~airflow.sdk.get_current_context` instead. - _api/airflow/models/taskinstancekey/index .. _pythonapi:hooks: Hooks ------ +===== Hooks are interfaces to external platforms and databases, implementing a common interface when possible and acting as building blocks for operators. All hooks -are derived from :class:`~airflow.hooks.base.BaseHook`. +are derived from :class:`~airflow.sdk.bases.hook.BaseHook`. Airflow has a set of Hooks that are considered public. You are free to extend their functionality by extending them: @@ -152,14 +240,44 @@ by extending them: _api/airflow/hooks/index Public Airflow utilities ------------------------- +======================== -When writing or extending Hooks and Operators, DAG authors and developers can +When writing or extending Hooks and Operators, DAG Authors and developers can use the following classes: -* The :class:`~airflow.models.connection.Connection`, which provides access to external service credentials and configuration. -* The :class:`~airflow.models.variable.Variable`, which provides access to Airflow configuration variables. -* The :class:`~airflow.models.xcom.XCom` which are used to access to inter-task communication data. +* The :class:`~airflow.sdk.Connection`, which provides access to external service credentials and configuration. +* The :class:`~airflow.sdk.Variable`, which provides access to Airflow configuration variables. +* The :class:`~airflow.sdk.execution_time.xcom.XCom` which are used to access to inter-task communication data. + +Connection and Variable operations should be performed through the Task Context using +:func:`~airflow.sdk.get_current_context` and the task instance's methods, or through the airflow.sdk namespace. +Direct database access to :class:`~airflow.models.connection.Connection` and :class:`~airflow.models.variable.Variable` +models is no longer allowed from task code. + +Example of accessing Connections and Variables through Task Context: + +.. code-block:: python + + from airflow.sdk import get_current_context + + + def my_task(): + context = get_current_context() + + conn = context["conn"] + my_connection = conn.get("my_connection_id") + + var = context["var"] + my_variable = var.value.get("my_variable_name") + +Example of using airflow.sdk namespace directly: + +.. code-block:: python + + from airflow.sdk import Connection, Variable + + conn = Connection.get("my_connection_id") + var = Variable.get("my_variable_name") You can read more about the public Airflow utilities in :doc:`howto/connection`, :doc:`core-concepts/variables`, :doc:`core-concepts/xcoms` @@ -167,18 +285,13 @@ You can read more about the public Airflow utilities in :doc:`howto/connection`, Reference for classes used for the utilities are here: -.. toctree:: - :includehidden: - :glob: - :maxdepth: 1 - - _api/airflow/models/connection/index - _api/airflow/models/variable/index - _api/airflow/models/xcom/index +.. note:: + Connection, Variable, and XCom classes are now part of the airflow.sdk namespace. + For detailed API documentation, see the `Task SDK Reference `_. Public Exceptions ------------------ +================= When writing the custom Operators and Hooks, you can handle and raise public Exceptions that Airflow exposes: @@ -191,7 +304,7 @@ exposes: _api/airflow/exceptions/index Public Utility classes ----------------------- +====================== .. toctree:: :includehidden: @@ -215,7 +328,7 @@ that do not require plugins - you can read more about them in :doc:`howto/custom Here are the ways how Plugins can be used to extend Airflow: Triggers --------- +======== Airflow uses Triggers to implement ``asyncio`` compatible Deferrable Operators. All Triggers derive from :class:`~airflow.triggers.base.BaseTrigger`. @@ -233,7 +346,7 @@ by extending them: You can read more about Triggers in :doc:`authoring-and-scheduling/deferring`. Timetables ----------- +========== Custom timetable implementations provide Airflow's scheduler additional logic to schedule DAG runs in ways not possible with built-in schedule expressions. @@ -251,7 +364,7 @@ by extending them: You can read more about Timetables in :doc:`howto/timetable`. Listeners ---------- +========= Listeners enable you to respond to DAG/Task lifecycle events. @@ -264,11 +377,8 @@ can be implemented to respond to DAG/Task lifecycle events. You can read more about Listeners in :doc:`administration-and-deployment/listeners`. -.. - TODO AIP-72: This class has been moved to task sdk but we cannot add a doc reference for it yet because task sdk doesn't have rendered docs yet. - Extra Links ------------ +=========== Extra links are dynamic links that could be added to Airflow independently from custom Operators. Normally they can be defined by the Operators, but plugins allow you to override the links on a global level. @@ -285,7 +395,7 @@ You can read more about providers :doc:`providers `. Executors ---------- +========= Executors are the mechanism by which task instances get run. All executors are derived from :class:`~airflow.executors.base_executor.BaseExecutor`. There are several @@ -305,10 +415,10 @@ You can read more about executors and how to write your own in :doc:`core-concep executors, and custom executors could not provide full functionality that built-in executors had. Secrets Backends ----------------- +================ Airflow can be configured to rely on secrets backends to retrieve -:class:`~airflow.models.connection.Connection` and :class:`~airflow.models.variable.Variable`. +:class:`~airflow.sdk.Connection` and :class:`~airflow.sdk.Variable`. All secrets backends derive from :class:`~airflow.secrets.base_secrets.BaseSecretsBackend`. All Secrets Backend implementations are public. You can extend their functionality: @@ -325,7 +435,7 @@ You can also find all the available Secrets Backends implemented in community pr in :doc:`apache-airflow-providers:core-extensions/secrets-backends`. Auth managers -------------- +============= Auth managers are responsible of user authentication and user authorization in Airflow. All auth managers are derived from :class:`~airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager`. @@ -336,21 +446,21 @@ public, but the different implementations of auth managers are not (i.e. FabAuth You can read more about auth managers and how to write your own in :doc:`core-concepts/auth-manager/index`. Connections ------------ +=========== When creating Hooks, you can add custom Connections. You can read more about connections in :doc:`apache-airflow-providers:core-extensions/connections` for available Connections implemented in the community providers. Extra Links ------------ +=========== When creating Hooks, you can add custom Extra Links that are displayed when the tasks are run. You can find out more about extra links in :doc:`apache-airflow-providers:core-extensions/extra-links` that also shows available extra links implemented in the community providers. Logging and Monitoring ----------------------- +====================== You can extend the way how logs are written by Airflow. You can find out more about log writing in :doc:`administration-and-deployment/logging-monitoring/index`. @@ -359,40 +469,44 @@ The :doc:`apache-airflow-providers:core-extensions/logging` that also shows avai implemented in the community providers. Decorators ----------- -DAG authors can use decorators to author dags using the :doc:`TaskFlow ` concept. -All Decorators derive from :class:`~airflow.decorators.base.TaskDecorator`. +========== +DAG Authors can use decorators to author dags using the :doc:`TaskFlow ` concept. +All Decorators derive from :class:`~airflow.sdk.bases.decorator.TaskDecorator`. + +The primary decorators for DAG Authors are now in the airflow.sdk namespace: +:func:`~airflow.sdk.dag`, :func:`~airflow.sdk.task`, :func:`~airflow.sdk.asset`, +:func:`~airflow.sdk.setup`, :func:`~airflow.sdk.task_group`, :func:`~airflow.sdk.teardown`, +:func:`~airflow.sdk.chain`, :func:`~airflow.sdk.chain_linear`, :func:`~airflow.sdk.cross_downstream`, +:func:`~airflow.sdk.get_current_context` and :func:`~airflow.sdk.get_parsing_context`. Airflow has a set of Decorators that are considered public. You are free to extend their functionality by extending them: -.. toctree:: - :includehidden: - :maxdepth: 1 - - _api/airflow/decorators/index +.. note:: + Decorators are now part of the airflow.sdk namespace. + For detailed API documentation, see the `Task SDK Reference `_. You can read more about creating custom Decorators in :doc:`howto/create-custom-decorator`. Email notifications -------------------- +=================== Airflow has a built-in way of sending email notifications and it allows to extend it by adding custom email notification classes. You can read more about email notifications in :doc:`howto/email-config`. Notifications -------------- +============= Airflow has a built-in extensible way of sending notifications using the various ``on_*_callback``. You can read more about notifications in :doc:`howto/notifications`. Cluster Policies ----------------- +================ Cluster Policies are the way to dynamically apply cluster-wide policies to the dags being parsed or tasks being executed. You can read more about Cluster Policies in :doc:`administration-and-deployment/cluster-policies`. Lineage -------- +======= Airflow can help track origins of data, what happens to it and where it moves over time. You can read more about lineage in :doc:`administration-and-deployment/lineage`. @@ -418,3 +532,49 @@ but in Airflow they are not parts of the Public Interface and might change any t * Python classes except those explicitly mentioned in this document, are considered an internal implementation detail and you should not assume they will be maintained in a backwards-compatible way. + +**Direct metadata database access from task code is no longer allowed**. +Task code cannot directly access the metadata database to query DAG state, task history, +or DAG runs. Instead, use one of the following alternatives: + +* **Task Context**: Use :func:`~airflow.sdk.get_current_context` to access task instance + information and methods like :meth:`~airflow.sdk.types.RuntimeTaskInstanceProtocol.get_dr_count`, + :meth:`~airflow.sdk.types.RuntimeTaskInstanceProtocol.get_dagrun_state`, and + :meth:`~airflow.sdk.types.RuntimeTaskInstanceProtocol.get_task_states`. + +* **REST API**: Use the :doc:`Stable REST API ` for programmatic + access to Airflow metadata. + +* **Python Client**: Use the `Python Client `_ for Python-based + interactions with Airflow. + +This change improves architectural separation and enables remote execution capabilities. + +Example of using Task Context instead of direct database access: + +.. code-block:: python + + from airflow.sdk import dag, get_current_context, task + from airflow.utils.state import DagRunState + from datetime import datetime + + + @dag(dag_id="example_dag", start_date=datetime(2025, 1, 1), schedule="@hourly", tags=["misc"], catchup=False) + def example_dag(): + + @task(task_id="check_dagrun_state") + def check_state(): + context = get_current_context() + ti = context["ti"] + dag_run = context["dag_run"] + + # Use Task Context methods instead of direct DB access + dr_count = ti.get_dr_count(dag_id="example_dag") + dagrun_state = ti.get_dagrun_state(dag_id="example_dag", run_id=dag_run.run_id) + + return f"DAG run count: {dr_count}, current state: {dagrun_state}" + + check_state() + + + example_dag() From 76e9e78dadd2f393bf32d292e434ec93f981be3d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 8 Jul 2025 10:29:46 -0400 Subject: [PATCH 05/11] [v3-0-test] Fix log for skipped task (#53024) (#53028) * [v3-0-test] Fix log for skipped taks (#53024) (cherry picked from commit 0d6e4172e6e4eccabd5c8b12606c10cfd23ee58b) Co-authored-by: Pierre Jeambrun * Fix CI --------- Co-authored-by: Pierre Jeambrun --- .../openapi/v2-rest-api-generated.yaml | 2 +- .../api_fastapi/core_api/routes/public/log.py | 4 +- .../ui/src/pages/TaskInstance/Logs/Logs.tsx | 2 +- .../airflow/utils/log/file_task_handler.py | 9 +++++ .../tests/unit/utils/test_log_handlers.py | 37 +++++++++++++++++++ 5 files changed, 50 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 7478465911042..330861ec1d22e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -6323,7 +6323,7 @@ paths: required: true schema: type: integer - exclusiveMinimum: 0 + minimum: 0 title: Try Number - name: full_content in: query diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index de7b327b6a4f1..1a7242c9936e3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -22,7 +22,7 @@ from fastapi import Depends, HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer -from pydantic import PositiveInt +from pydantic import NonNegativeInt from sqlalchemy.orm import joinedload from sqlalchemy.sql import select @@ -75,7 +75,7 @@ def get_log( dag_id: str, dag_run_id: str, task_id: str, - try_number: PositiveInt, + try_number: NonNegativeInt, accept: HeaderAcceptJsonOrNdjson, request: Request, dag_bag: DagBagDep, diff --git a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx index 3b9864a6db3ad..5b84591a605c5 100644 --- a/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx +++ b/airflow-core/src/airflow/ui/src/pages/TaskInstance/Logs/Logs.tsx @@ -84,7 +84,7 @@ export const Logs = () => { logLevelFilters, sourceFilters, taskInstance, - tryNumber: tryNumber === 0 ? 1 : tryNumber, + tryNumber, }); return ( diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index db18e47f13f93..89bdf385ee80f 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -482,6 +482,15 @@ def read( """ if try_number is None: try_number = task_instance.try_number + + if task_instance.state == TaskInstanceState.SKIPPED: + logs = [ + StructuredLogMessage( # type: ignore[call-arg] + event="Task was skipped, no logs available." + ) + ] + return logs, {"end_of_log": True} + if try_number is None or try_number < 1: logs = [ StructuredLogMessage( # type: ignore[call-arg] diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 5813c22819eec..286eae5c84255 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -151,6 +151,43 @@ def task_callable(ti): # Remove the generated tmp log file. os.remove(log_filename) + def test_file_task_handler_when_ti_is_skipped(self, dag_maker): + def task_callable(ti): + ti.log.info("test") + + with dag_maker("dag_for_testing_file_task_handler", schedule=None): + task = PythonOperator( + task_id="task_for_testing_file_log_handler", + python_callable=task_callable, + ) + dagrun = dag_maker.create_dagrun() + ti = TaskInstance(task=task, run_id=dagrun.run_id) + + ti.try_number = 0 + ti.state = State.SKIPPED + + logger = ti.log + ti.log.disabled = False + + file_handler = next( + (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None + ) + assert file_handler is not None + + set_context(logger, ti) + assert file_handler.handler is not None + # We expect set_context generates a file locally. + log_filename = file_handler.handler.baseFilename + assert os.path.isfile(log_filename) + assert log_filename.endswith("0.log"), log_filename + + # Return value of read must be a tuple of list and list. + logs, metadata = file_handler.read(ti) + assert logs[0].event == "Task was skipped, no logs available." + + # Remove the generated tmp log file. + os.remove(log_filename) + @pytest.mark.xfail(reason="TODO: Needs to be ported over to the new structlog based logging") def test_file_task_handler(self, dag_maker, session): def task_callable(ti): From 1fc4c781b69bab0255fb11aa430da4e8f0b1dc3c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 9 Jul 2025 14:19:52 +0200 Subject: [PATCH 06/11] [v3-0-test] Invalidate grid runs and summaries and TI/Runs actions (#53081) (#53086) (cherry picked from commit 3eaf4e90b69588e5022a0ead63ba0bc8c6382ed1) Co-authored-by: Pierre Jeambrun --- airflow-core/src/airflow/ui/src/queries/useClearRun.ts | 4 ++++ .../src/airflow/ui/src/queries/useClearTaskInstances.ts | 4 ++++ airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts | 4 ++++ .../src/airflow/ui/src/queries/usePatchTaskInstance.ts | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts index 1c7a80d2a9903..6d19f10ce843d 100644 --- a/airflow-core/src/airflow/ui/src/queries/useClearRun.ts +++ b/airflow-core/src/airflow/ui/src/queries/useClearRun.ts @@ -25,6 +25,8 @@ import { UseDagServiceGetDagDetailsKeyFn, UseGridServiceGridDataKeyFn, useTaskInstanceServiceGetTaskInstancesKey, + UseGridServiceGetGridRunsKeyFn, + UseGridServiceGetGridTiSummariesKeyFn, } from "openapi/queries"; import { toaster } from "src/components/ui"; @@ -57,6 +59,8 @@ export const useClearDagRun = ({ [useDagRunServiceGetDagRunsKey], [useClearDagRunDryRunKey, dagId], UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ dagId, runId: dagRunId }]), ]; await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts index 205f6faa0722a..f0ed037858177 100644 --- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts +++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts @@ -24,6 +24,8 @@ import { UseGridServiceGridDataKeyFn, UseTaskInstanceServiceGetMappedTaskInstanceKeyFn, useTaskInstanceServicePostClearTaskInstances, + UseGridServiceGetGridRunsKeyFn, + UseGridServiceGetGridTiSummariesKeyFn, } from "openapi/queries"; import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from "openapi/requests/types.gen"; import { toaster } from "src/components/ui"; @@ -82,6 +84,8 @@ export const useClearTaskInstances = ({ [useClearTaskInstancesDryRunKey, dagId], [usePatchTaskInstanceDryRunKey, dagId, dagRunId], UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ dagId, runId: dagRunId }]), ]; await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts index 6ca008b6dc137..859199a935249 100644 --- a/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts +++ b/airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts @@ -24,6 +24,8 @@ import { useDagRunServicePatchDagRun, UseGridServiceGridDataKeyFn, useTaskInstanceServiceGetTaskInstancesKey, + UseGridServiceGetGridRunsKeyFn, + UseGridServiceGetGridTiSummariesKeyFn, } from "openapi/queries"; import { toaster } from "src/components/ui"; @@ -55,6 +57,8 @@ export const usePatchDagRun = ({ [useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }], [useClearDagRunDryRunKey, dagId], UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ dagId, runId: dagRunId }]), ]; await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); diff --git a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts index 8cd6a7c36cb48..988e24c72c24b 100644 --- a/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts +++ b/airflow-core/src/airflow/ui/src/queries/usePatchTaskInstance.ts @@ -24,6 +24,8 @@ import { UseTaskInstanceServiceGetTaskInstanceKeyFn, useTaskInstanceServiceGetTaskInstancesKey, useTaskInstanceServicePatchTaskInstance, + UseGridServiceGetGridRunsKeyFn, + UseGridServiceGetGridTiSummariesKeyFn, } from "openapi/queries"; import { toaster } from "src/components/ui"; @@ -61,6 +63,8 @@ export const usePatchTaskInstance = ({ [usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }], [useClearTaskInstancesDryRunKey, dagId], UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId: dagRunId }, [{ dagId, runId: dagRunId }]), ]; await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key }))); From 97e9fc14cc8aeefc1f88b2930d75cfff4200725a Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Wed, 9 Jul 2025 20:10:19 +0200 Subject: [PATCH 07/11] [v3-0-test] Fix no_status and duration for grid summaries #53087 (#53092) * [v3-0-test] Fix no_status and duration for grid summaries #53087 * Fix CI --- .../api_fastapi/core_api/datamodels/ui/common.py | 8 +++++--- .../src/airflow/api_fastapi/core_api/routes/ui/grid.py | 1 - .../airflow/api_fastapi/core_api/services/ui/grid.py | 2 +- .../unit/api_fastapi/core_api/routes/ui/test_grid.py | 10 +++++----- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py index 0f315326194e5..4fedad08e1c27 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py @@ -23,6 +23,7 @@ from pydantic import computed_field from airflow.api_fastapi.core_api.base import BaseModel +from airflow.utils import timezone from airflow.utils.state import TaskInstanceState from airflow.utils.types import DagRunType @@ -81,9 +82,10 @@ class GridRunsResponse(BaseModel): @computed_field def duration(self) -> int | None: - if self.start_date and self.end_date: - return (self.end_date - self.start_date).seconds - return None + if self.start_date: + end_date = self.end_date or timezone.utcnow() + return (end_date - self.start_date).seconds + return 0 class BaseGraphResponse(BaseModel, Generic[E, N]): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index ce1a582a511f7..6af0f4e85c65d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -480,7 +480,6 @@ def get_grid_runs( ) ), ], - response_model_exclude_none=True, ) def get_grid_ti_summaries( dag_id: str, diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py index 04b031455a63f..73f70ea3f7e0e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py @@ -323,7 +323,7 @@ def agg_state(states): for state in state_priority: if state in states: return state - return "no_status" + return None def _find_aggregates( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py index 908ebb24bebf4..9196fb74b10c2 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py @@ -1251,13 +1251,13 @@ def sort_dict(in_dict): expected = [ {"task_id": "mapped_task_group", "state": "running"}, - {"task_id": "task_group.inner_task_group"}, - {"task_id": "task_group"}, - {"task_id": "mapped_task_2"}, + {"task_id": "task_group.inner_task_group", "state": None}, + {"task_id": "task_group", "state": None}, + {"task_id": "mapped_task_2", "state": None}, {"task_id": "mapped_task_group.subtask", "state": "running"}, {"task_id": "task", "state": "success"}, - {"task_id": "task_group.inner_task_group.inner_task_group_sub_task"}, - {"task_id": "task_group.mapped_task"}, + {"task_id": "task_group.inner_task_group.inner_task_group_sub_task", "state": None}, + {"task_id": "task_group.mapped_task", "state": None}, ] expected = sort_dict(expected) actual = sort_dict(actual) From 4be0475ce4f05b2d1528f906b5ab59d24c844c51 Mon Sep 17 00:00:00 2001 From: ayush3singh Date: Fri, 16 May 2025 14:13:06 +0530 Subject: [PATCH 08/11] Fixes issue RuntimeTaskInstance does not contain log_url | added during taskrunner startup (#50376) (cherry picked from commit 330789ecd8bd3df4c5760f5b6172a10c77b6b0ef) --- .../airflow/sdk/execution_time/task_runner.py | 20 +++++++++++++++++++ .../execution_time/test_task_runner.py | 3 +++ 2 files changed, 23 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 5fa868b9b88bd..6c6e597f65e5c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -144,6 +144,8 @@ class RuntimeTaskInstance(TaskInstance): rendered_map_index: str | None = None + log_url: str | None = None + def __rich_repr__(self): yield "id", self.id yield "task_id", self.task_id @@ -549,6 +551,23 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, value: Any) -> None: ) +def get_log_url_from_ti(ti: RuntimeTaskInstance) -> str: + from urllib.parse import quote + + from airflow.configuration import conf + + run_id = quote(ti.run_id) + base_url = conf.get("api", "base_url", fallback="http://localhost:8080/") + map_index_value = getattr(ti, "map_index", -1) + map_index = f"/mapped/{map_index_value}" if map_index_value is not None and map_index_value >= 0 else "" + try_number_value = getattr(ti, "try_number", 0) + try_number = ( + f"?try_number={try_number_value}" if try_number_value is not None and try_number_value > 0 else "" + ) + _log_uri = f"{base_url}dags/{ti.dag_id}/runs/{run_id}/tasks/{ti.task_id}{map_index}{try_number}" + return _log_uri + + def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: # TODO: Task-SDK: # Using DagBag here is about 98% wrong, but it'll do for now @@ -677,6 +696,7 @@ def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) + ti.log_url = get_log_url_from_ti(ti) log.debug("DAG file parsed", file=msg.dag_rel_path) run_as_user = getattr(ti.task, "run_as_user", None) or conf.get( diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 180068f801b8f..f2ff89670818b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -106,6 +106,7 @@ _push_xcom_if_needed, _xcom_push, finalize, + get_log_url_from_ti, parse, run, startup, @@ -2206,6 +2207,8 @@ def execute(self, context): mocked_parse(what, "basic_dag", task) runtime_ti, context, log = startup() + assert runtime_ti is not None + assert runtime_ti.log_url == get_log_url_from_ti(runtime_ti) assert isinstance(listener.component, TaskRunnerMarker) del listener.component From aaeb727636d89f1590b93024b54cc4747c6d7bb2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 10 Jul 2025 00:38:22 +0530 Subject: [PATCH 09/11] [v3-0-test] Fix log for skipped tasks follow up (#53075) (#53101) (cherry picked from commit b052f844907e7f3a9e4a3f8826b0f4525fafceb2) Co-authored-by: Pierre Jeambrun --- airflow-core/src/airflow/utils/log/file_task_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 89bdf385ee80f..66d3f95f92ee6 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -483,7 +483,7 @@ def read( if try_number is None: try_number = task_instance.try_number - if task_instance.state == TaskInstanceState.SKIPPED: + if try_number == 0 and task_instance.state == TaskInstanceState.SKIPPED: logs = [ StructuredLogMessage( # type: ignore[call-arg] event="Task was skipped, no logs available." From 7908123d8b1c11d09fe4ea8c59f2be9da75e890e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 10 Jul 2025 00:56:10 +0530 Subject: [PATCH 10/11] [v3-0-test] Deserialize response of `get_all` when we call `XCom.get_all` (#53020) (#53102) (cherry picked from commit bdc9cd115d103614159ea7492db8ff16607c6959) Co-authored-by: Amogh Desai --- task-sdk/src/airflow/sdk/bases/xcom.py | 6 ++- .../airflow/sdk/execution_time/task_runner.py | 12 +++-- .../execution_time/test_task_runner.py | 53 +++++++++++++++++++ 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/task-sdk/src/airflow/sdk/bases/xcom.py b/task-sdk/src/airflow/sdk/bases/xcom.py index 770dbf53df769..82df8d151ab13 100644 --- a/task-sdk/src/airflow/sdk/bases/xcom.py +++ b/task-sdk/src/airflow/sdk/bases/xcom.py @@ -290,6 +290,7 @@ def get_all( :return: List of all XCom values if found. """ from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS + from airflow.serialization.serde import deserialize msg = SUPERVISOR_COMMS.send( msg=GetXComSequenceSlice( @@ -306,7 +307,10 @@ def get_all( if not isinstance(msg, XComSequenceSliceResult): raise TypeError(f"Expected XComSequenceSliceResult, received: {type(msg)} {msg}") - return msg.root + result = deserialize(msg.root) + if not result: + return None + return result @staticmethod def serialize_value( diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 6c6e597f65e5c..bdd3b240dafb8 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -351,17 +351,19 @@ def xcom_pull( # If map_indexes is not specified, pull xcoms from all map indexes for each task if isinstance(map_indexes, ArgNotSet): - xcoms = [ - value - for t_id in task_ids - for value in XCom.get_all( + xcoms: list[Any] = [] + for t_id in task_ids: + values = XCom.get_all( run_id=run_id, key=key, task_id=t_id, dag_id=dag_id, ) - ] + if values is None: + xcoms.append(None) + else: + xcoms.extend(values) # For single task pulling from unmapped task, return single value if single_task_requested and len(xcoms) == 1: return xcoms[0] diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index f2ff89670818b..362b89e3ad0a5 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -1355,6 +1355,7 @@ def test_get_variable_from_context( pytest.param("hello", id="string_value"), pytest.param("'hello'", id="quoted_string_value"), pytest.param({"key": "value"}, id="json_value"), + pytest.param([], id="empty_list_no_xcoms_found"), pytest.param((1, 2, 3), id="tuple_int_value"), pytest.param([1, 2, 3], id="list_int_value"), pytest.param(42, id="int_value"), @@ -1377,6 +1378,9 @@ def test_xcom_pull( """ map_indexes_kwarg = {} if map_indexes is NOTSET else {"map_indexes": map_indexes} task_ids_kwarg = {} if task_ids is NOTSET else {"task_ids": task_ids} + from airflow.serialization.serde import deserialize + + spy_agency.spy_on(deserialize) class CustomOperator(BaseOperator): def execute(self, context): @@ -1402,6 +1406,7 @@ def mock_send_side_effect(*args, **kwargs): mock_supervisor_comms.send.side_effect = mock_send_side_effect run(runtime_ti, context=runtime_ti.get_template_context(), log=mock.MagicMock()) + spy_agency.assert_spy_called_with(deserialize, ser_value) if not isinstance(task_ids, Iterable) or isinstance(task_ids, str): task_ids = [task_ids] @@ -1507,6 +1512,54 @@ def mock_get_all_side_effect(task_id, **kwargs): assert mock_get_one.called assert not mock_get_all.called + @pytest.mark.parametrize( + "api_return_value", + [ + pytest.param(("data", "test_value"), id="api returns tuple"), + pytest.param({"data": "test_value"}, id="api returns dict"), + pytest.param(None, id="api returns None, no xcom found"), + ], + ) + def test_xcom_pull_with_no_map_index( + self, + api_return_value, + create_runtime_ti, + mock_supervisor_comms, + ): + """ + Test xcom_pull when map_indexes is not specified, so that XCom.get_all is called. + The test also tests if the response is deserialized and returned. + """ + test_task_id = "pull_task" + task = BaseOperator(task_id=test_task_id) + runtime_ti = create_runtime_ti(task=task) + + ser_value = BaseXCom.serialize_value(api_return_value) + + def mock_send_side_effect(*args, **kwargs): + msg = kwargs.get("msg") or args[0] + if isinstance(msg, GetXComSequenceSlice): + return XComSequenceSliceResult(root=[ser_value]) + return XComResult(key="test_key", value=None) + + mock_supervisor_comms.send.side_effect = mock_send_side_effect + result = runtime_ti.xcom_pull(key="test_key", task_ids="task_a") + + # if the API returns a tuple or dict, the below assertion assures that the value is deserialized correctly by XCom.get_all + assert result == api_return_value + + mock_supervisor_comms.send.assert_called_once_with( + msg=GetXComSequenceSlice( + key="test_key", + dag_id=runtime_ti.dag_id, + run_id=runtime_ti.run_id, + task_id="task_a", + start=None, + stop=None, + step=None, + ), + ) + def test_get_param_from_context( self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti ): From d5ab8ffd99f2e9f9ed6f3c949046b4f15aa0e8c5 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 10 Jul 2025 00:58:17 +0530 Subject: [PATCH 11/11] Update release notes for 3.0.3rc5 --- RELEASE_NOTES.rst | 10 +++++++++- reproducible_build.yaml | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 6554068974c1c..a91ec90d8db26 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -24,7 +24,7 @@ .. towncrier release notes start -Airflow 3.0.3 (2025-07-08) +Airflow 3.0.3 (2025-07-14) -------------------------- Significant Changes @@ -80,6 +80,13 @@ Bug Fixes - Fixing bad cadwyn migration for upstream map indexes (#52797) - Run trigger expansion logic only when ``start_from_trigger`` is True (#52873) - Fix example dag ``example_external_task_parent_deferrable.py`` imports (#52957) +- Fixes pagination in DAG run lists (#52989) +- Fix db downgrade check condition (#53005) +- Fix log viewing for skipped task (#53028,#53101) +- Fixes Grid view refresh after user actions (#53086) +- Fix ``no_status`` and ``duration`` for grid summaries (#53092) +- Fix ``ti.log_url`` not in Task Context (#50376) +- Fix XCom data deserialization when using ``XCom.get_all()`` method (#53102) Miscellaneous """"""""""""" @@ -99,6 +106,7 @@ Doc Only Changes - Add http-only warning when running behind proxy in documentation (#52699) - Publish separate docs for Task SDK (#52682) - Streamline Taskflow examples and link to core tutorial (#52709) +- Refresh Public Interface & align how-to guides for Airflow 3.0+ (#53011) Airflow 3.0.2 (2025-06-10) -------------------------- diff --git a/reproducible_build.yaml b/reproducible_build.yaml index 5da4c00513713..a75a2c426a177 100644 --- a/reproducible_build.yaml +++ b/reproducible_build.yaml @@ -1,2 +1,2 @@ -release-notes-hash: ddd077b4b5c548694222c69b5f4f239f -source-date-epoch: 1751886321 +release-notes-hash: 7079979bff6b12b0e8cff21b3e453319 +source-date-epoch: 1752089294