Skip to content

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jul 3, 2025

Problem

Mapped tasks on old client of version (1.0.1) and new airflow API server (3.0.2) started failing.

Example DAG:

import datetime

from airflow.decorators import dag
from airflow.decorators import task

@dag(start_date=datetime.datetime(2024, 10, 1), schedule=None, catchup=False)
def dynamic_xcom():
    @task
    def task_1():
        print("task_1")

    @task
    def task_2(x):
        print("task_2")
        print(x)
        return x

    @task
    def task_3(y):
        print("task_3")
        print(y)

    t1 = task_1()
    t2 = task_2.expand(x=[0, 1, 2])
    t1 >> t2
    task_3.expand(y=t2)
dynamic_xcom()

Failure:
image

The issue was in the Cadwyn version downgrade logic that converts API responses for older clients. When _get_upstream_map_indexes correctly returned {"task_2": None}, the downgrade_upstream_map_indexes function was incorrectly converting None to -1 as it seemed semantically right. This conversion was breaking the Task SDK's mapped task expansion logic.

The task SDK's get_task_map_length function expects None values in upstream_map_indexes. However, when None was converted to -1, the function calculated (-1 or 1) * len(resolved_val) = -1 * 3 = -3, which triggered the "cannot expand field mapped to length -3".

Fix and testing

DAG:

import datetime

from airflow.decorators import dag
from airflow.decorators import task

@dag(start_date=datetime.datetime(2024, 10, 1), schedule=None, catchup=False)
def dynamic_xcom():
    @task
    def task_1():
        print("task_1")

    @task
    def task_2(x):
        print("task_2")
        print(x)
        return x

    @task
    def task_3(y):
        print("task_3")
        print(y)

    t1 = task_1()
    t2 = task_2.expand(x=[0, 1, 2])
    t1 >> t2
    task_3.expand(y=t2)
dynamic_xcom()

image

  • Also added a test case that checks for this explictly.
  • This fix will ensure that sdk like 1.0.1 can still work with latest airflow API server (3.0.2+)

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh requested review from ashb and kaxil as code owners July 3, 2025 15:34
@boring-cyborg boring-cyborg bot added the area:API Airflow's REST/HTTP API label Jul 3, 2025
@kaxil kaxil added the backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch label Jul 3, 2025
@kaxil kaxil added this to the Airflow 3.0.4 milestone Jul 3, 2025
@kaxil kaxil merged commit 2bb5d01 into apache:main Jul 3, 2025
104 checks passed
@kaxil kaxil deleted the fix-bad-cadwyn-migration branch July 3, 2025 16:57
Copy link

github-actions bot commented Jul 3, 2025

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker 2bb5d01 v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

kaxil pushed a commit that referenced this pull request Jul 3, 2025
@kaxil
Copy link
Member

kaxil commented Jul 3, 2025

Backported: d7825f1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants