Skip to content

Commit d7825f1

Browse files
amoghrajeshkaxil
authored andcommitted
Fixing bad cadwyn migration for upstream map indexes (#52797)
(cherry picked from commit 2bb5d01)
1 parent 8f37d4f commit d7825f1

File tree

4 files changed

+16
-7
lines changed

4 files changed

+16
-7
lines changed

RELEASE_NOTES.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ Bug Fixes
7777
- Add ti information to re-queue logs (#49995)
7878
- Task SDK: Fix ``AssetEventOperations.get`` to use ``alias_name`` when specified (#52324)
7979
- Ensure trigger kwargs are properly deserialized during trigger execution (#52721)
80+
- Fixing bad cadwyn migration for upstream map indexes (#52797)
8081

8182
Miscellaneous
8283
"""""""""""""

airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_05_20.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class DowngradeUpstreamMapIndexes(VersionChange):
3030
description = __doc__
3131

3232
instructions_to_migrate_to_previous_version = (
33-
schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str, int]]), # type: ignore
33+
schema(TIRunContext).field("upstream_map_indexes").had(type=Optional[dict[str, Optional[int]]]),
3434
)
3535

3636
@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
@@ -42,13 +42,14 @@ def downgrade_upstream_map_indexes(response: ResponseInfo = None) -> None: # ty
4242
"""
4343
resp = response.body.get("upstream_map_indexes")
4444
if isinstance(resp, dict):
45-
downgraded = {}
45+
downgraded: dict[str, int | list | None] = {}
4646
for k, v in resp.items():
4747
if isinstance(v, int):
4848
downgraded[k] = v
4949
elif isinstance(v, list) and v and all(isinstance(i, int) for i in v):
5050
downgraded[k] = v[0]
5151
else:
52-
# for cases like None, make it -1
53-
downgraded[k] = -1
52+
# Keep values like None as is — the Task SDK expects them unchanged during mapped task expansion,
53+
# and modifying them can cause unexpected failures.
54+
downgraded[k] = None
5455
response.body["upstream_map_indexes"] = downgraded

airflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28/test_task_instances.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,16 @@ def teardown_method(self):
6262
{"task_a": 3, "task_b": 9},
6363
id="list of ints",
6464
),
65+
pytest.param(
66+
[
67+
("task_a", None),
68+
],
69+
{"task_a": None},
70+
id="task has no upstreams",
71+
),
6572
pytest.param(
6673
[("task_a", None), ("task_b", [6, 7]), ("task_c", 2)],
67-
{"task_a": -1, "task_b": 6, "task_c": 2},
74+
{"task_a": None, "task_b": 6, "task_c": 2},
6875
id="mixed types",
6976
),
7077
],

reproducible_build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
release-notes-hash: 3a16d9fdbc3cac54b23e0cac1e92ff10
2-
source-date-epoch: 1751573977
1+
release-notes-hash: de8144e966c67986caa98ae835947858
2+
source-date-epoch: 1751574576

0 commit comments

Comments
 (0)