-
Notifications
You must be signed in to change notification settings - Fork 15.5k
Open
Labels
affected_version:3.0Issues Reported for 3.0Issues Reported for 3.0area:TaskGrouparea:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Screen.Recording.2025-09-04.at.2.20.19.PM.mov
What you think should happen instead?
No response
How to reproduce
Run the below dag and try expanding and collapsing task groups:
from airflow.sdk import DAG
from airflow.decorators import task
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from random import seed, randint
@dataclass_json
@dataclass
class Payload:
seed: int
a: str
b: str
c: str
d: str
def make(i):
return Payload(
seed=i, a=str(i) * 5, b=str(i) * 10, c=str(i) * 15, d=str(i) * 20
)
def transform(self):
self.a = str(int(int(self.a) / 5))
self.b = str(int(int(self.a) / 10))
self.c = str(int(int(self.a) / 15))
self.d = str(int(int(self.a) / 20))
@task
def mk_payloads(i):
return [Payload.make(j).to_dict() for j in range(1, i)]
@task
def transform(_payload):
payload = Payload.from_dict(_payload)
payload.transform()
return payload.to_dict()
@task
def check_transformed(_payload):
payload: Payload = Payload.from_dict(_payload)
orig = Payload.make(payload.seed)
orig.transform()
# same transformation made to same data
# results should be same, otherwise something got lost along the way
assert orig == payload
minmax_map_indices_per_lane = (5, 60)
lanes = 10
with DAG(
dag_id="many_expand", schedule=None, start_date=datetime(1970, 1, 1), catchup=False,tags=["taskmap"]
) as dag:
seed(42) # be deterministic
for i in range(lanes):
with TaskGroup(group_id=f"lane{i+1}"):
min_map, max_map = minmax_map_indices_per_lane
mapped = randint(min_map, max_map)
check_transformed.expand(
_payload=transform.expand(_payload=mk_payloads(mapped))
)
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
affected_version:3.0Issues Reported for 3.0Issues Reported for 3.0area:TaskGrouparea:corearea:dynamic-task-mappingAIP-42AIP-42kind:bugThis is a clearly a bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseHigh priority bug that should be patched quickly but does not require immediate new release