Skip to content

Commit 2bcc5a2

Browse files
amoghrajeshRoyLee1224
authored andcommitted
Do not ignore include_prior_dates in xcom_pull when map_indexes is not specified (apache#53809)
1 parent 7a39b55 commit 2bcc5a2

File tree

11 files changed

+142
-5
lines changed

11 files changed

+142
-5
lines changed

airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class GetXComSliceFilterParams(BaseModel):
230230
start: int | None = None
231231
stop: int | None = None
232232
step: int | None = None
233+
include_prior_dates: bool = False
233234

234235

235236
@router.get(
@@ -249,6 +250,7 @@ def get_mapped_xcom_by_slice(
249250
key=key,
250251
task_ids=task_id,
251252
dag_ids=dag_id,
253+
include_prior_dates=params.include_prior_dates,
252254
session=session,
253255
)
254256
query = query.order_by(None)

airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,17 @@
2424
from airflow.api_fastapi.execution_api.versions.v2025_08_10 import (
2525
AddDagRunStateFieldAndPreviousEndpoint,
2626
AddDagVersionIdField,
27+
AddIncludePriorDatesToGetXComSlice,
2728
)
2829

2930
bundle = VersionBundle(
3031
HeadVersion(),
31-
Version("2025-08-10", AddDagVersionIdField, AddDagRunStateFieldAndPreviousEndpoint),
32+
Version(
33+
"2025-08-10",
34+
AddDagVersionIdField,
35+
AddDagRunStateFieldAndPreviousEndpoint,
36+
AddIncludePriorDatesToGetXComSlice,
37+
),
3238
Version("2025-05-20", DowngradeUpstreamMapIndexes),
3339
Version("2025-04-28", AddRenderedMapIndexField),
3440
Version("2025-04-11"),

airflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_08_10.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, endpoint, schema
2121

2222
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TaskInstance, TIRunContext
23+
from airflow.api_fastapi.execution_api.routes.xcoms import GetXComSliceFilterParams
2324

2425

2526
class AddDagVersionIdField(VersionChange):
@@ -45,3 +46,13 @@ def remove_state_from_dag_run(response: ResponseInfo) -> None: # type: ignore[m
4546
"""Remove the `state` field from the dag_run object when converting to the previous version."""
4647
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
4748
response.body["dag_run"].pop("state", None)
49+
50+
51+
class AddIncludePriorDatesToGetXComSlice(VersionChange):
52+
"""Add the `include_prior_dates` field to GetXComSliceFilterParams."""
53+
54+
description = __doc__
55+
56+
instructions_to_migrate_to_previous_version = (
57+
schema(GetXComSliceFilterParams).field("include_prior_dates").didnt_exist,
58+
)

airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import pytest
2626
from fastapi import FastAPI, HTTPException, Path, Request, status
2727

28+
from airflow._shared.timezones import timezone
2829
from airflow.api_fastapi.execution_api.datamodels.xcom import XComResponse
2930
from airflow.models.dagrun import DagRun
3031
from airflow.models.taskmap import TaskMap
@@ -273,6 +274,54 @@ def __init__(self, *, x, **kwargs):
273274
assert response.status_code == 200
274275
assert response.json() == ["f", "o", "b"][key]
275276

277+
@pytest.mark.parametrize(
278+
"include_prior_dates, expected_xcoms",
279+
[[True, ["earlier_value", "later_value"]], [False, ["later_value"]]],
280+
)
281+
def test_xcom_get_slice_accepts_include_prior_dates(
282+
self, client, dag_maker, session, include_prior_dates, expected_xcoms
283+
):
284+
"""Test that the slice endpoint accepts include_prior_dates parameter and works correctly."""
285+
286+
with dag_maker(dag_id="dag"):
287+
EmptyOperator(task_id="task")
288+
289+
earlier_run = dag_maker.create_dagrun(
290+
run_id="earlier_run", logical_date=timezone.parse("2024-01-01T00:00:00Z")
291+
)
292+
later_run = dag_maker.create_dagrun(
293+
run_id="later_run", logical_date=timezone.parse("2024-01-02T00:00:00Z")
294+
)
295+
296+
earlier_ti = earlier_run.get_task_instance("task")
297+
later_ti = later_run.get_task_instance("task")
298+
299+
earlier_xcom = XComModel(
300+
key="test_key",
301+
value="earlier_value",
302+
dag_run_id=earlier_ti.dag_run.id,
303+
run_id=earlier_ti.run_id,
304+
task_id=earlier_ti.task_id,
305+
dag_id=earlier_ti.dag_id,
306+
)
307+
later_xcom = XComModel(
308+
key="test_key",
309+
value="later_value",
310+
dag_run_id=later_ti.dag_run.id,
311+
run_id=later_ti.run_id,
312+
task_id=later_ti.task_id,
313+
dag_id=later_ti.dag_id,
314+
)
315+
session.add_all([earlier_xcom, later_xcom])
316+
session.commit()
317+
318+
response = client.get(
319+
f"/execution/xcoms/dag/later_run/task/test_key/slice?include_prior_dates={include_prior_dates}"
320+
)
321+
assert response.status_code == 200
322+
323+
assert response.json() == expected_xcoms
324+
276325

277326
class TestXComsSetEndpoint:
278327
@pytest.mark.parametrize(

task-sdk/src/airflow/sdk/api/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ def get_sequence_slice(
496496
start: int | None,
497497
stop: int | None,
498498
step: int | None,
499+
include_prior_dates: bool = False,
499500
) -> XComSequenceSliceResponse:
500501
params = {}
501502
if start is not None:
@@ -504,6 +505,8 @@ def get_sequence_slice(
504505
params["stop"] = stop
505506
if step is not None:
506507
params["step"] = step
508+
if include_prior_dates:
509+
params["include_prior_dates"] = include_prior_dates
507510
resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}/slice", params=params)
508511
return XComSequenceSliceResponse.model_validate_json(resp.read())
509512

task-sdk/src/airflow/sdk/bases/xcom.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ def get_all(
275275
dag_id: str,
276276
task_id: str,
277277
run_id: str,
278+
include_prior_dates: bool = False,
278279
) -> Any:
279280
"""
280281
Retrieve all XCom values for a task, typically from all map indexes.
@@ -289,6 +290,9 @@ def get_all(
289290
:param run_id: DAG run ID for the task.
290291
:param dag_id: DAG ID to pull XComs from.
291292
:param task_id: Task ID to pull XComs from.
293+
:param include_prior_dates: If *False* (default), only XComs from the
294+
specified DAG run are returned. If *True*, the latest matching XComs are
295+
returned regardless of the run they belong to.
292296
:return: List of all XCom values if found.
293297
"""
294298
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
@@ -303,6 +307,7 @@ def get_all(
303307
start=None,
304308
stop=None,
305309
step=None,
310+
include_prior_dates=include_prior_dates,
306311
),
307312
)
308313

task-sdk/src/airflow/sdk/execution_time/comms.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ class GetXComSequenceSlice(BaseModel):
707707
start: int | None
708708
stop: int | None
709709
step: int | None
710+
include_prior_dates: bool = False
710711
type: Literal["GetXComSequenceSlice"] = "GetXComSequenceSlice"
711712

712713

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1132,7 +1132,14 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id:
11321132
resp = xcom
11331133
elif isinstance(msg, GetXComSequenceSlice):
11341134
xcoms = self.client.xcoms.get_sequence_slice(
1135-
msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.start, msg.stop, msg.step
1135+
msg.dag_id,
1136+
msg.run_id,
1137+
msg.task_id,
1138+
msg.key,
1139+
msg.start,
1140+
msg.stop,
1141+
msg.step,
1142+
msg.include_prior_dates,
11361143
)
11371144
resp = XComSequenceSliceResult.from_response(xcoms)
11381145
elif isinstance(msg, DeferTask):

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ def xcom_pull(
362362
key=key,
363363
task_id=t_id,
364364
dag_id=dag_id,
365+
include_prior_dates=include_prior_dates,
365366
)
366367

367368
if values is None:

task-sdk/tests/task_sdk/execution_time/test_supervisor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1973,10 +1973,11 @@ def watched_subprocess(self, mocker):
19731973
start=None,
19741974
stop=None,
19751975
step=None,
1976+
include_prior_dates=False,
19761977
),
19771978
{"root": ["foo", "bar"], "type": "XComSequenceSliceResult"},
19781979
"xcoms.get_sequence_slice",
1979-
("test_dag", "test_run", "test_task", "test_key", None, None, None),
1980+
("test_dag", "test_run", "test_task", "test_key", None, None, None, False),
19801981
{},
19811982
XComSequenceSliceResult(root=["foo", "bar"]),
19821983
None,

0 commit comments

Comments
 (0)