Skip to content

Commit 6723f19

Browse files
github-actions[bot]pierrejeambrun
authored andcommitted
[v3-0-test] Structure endpoint attach downstream asset to task level (#51401) (#51425)
* Attach downstream assets to task * Adjust tests (cherry picked from commit 14ee1f4) Co-authored-by: Pierre Jeambrun <[email protected]>
1 parent f4c68ad commit 6723f19

File tree

3 files changed

+38
-2
lines changed

3 files changed

+38
-2
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
2727
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
2828
from airflow.api_fastapi.core_api.security import requires_access_dag
29-
from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets
29+
from airflow.api_fastapi.core_api.services.ui.structure import (
30+
bind_output_assets_to_tasks,
31+
get_upstream_assets,
32+
)
3033
from airflow.models.dag_version import DagVersion
3134
from airflow.models.serialized_dag import SerializedDagModel
3235
from airflow.utils.dag_edges import dag_edges
@@ -139,4 +142,6 @@ def structure_data(
139142

140143
data["edges"] += start_edges + edges + end_edges
141144

145+
bind_output_assets_to_tasks(data["edges"], serialized_dag)
146+
142147
return StructureDataResponse(**data)

airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
from __future__ import annotations
2525

26+
from airflow.models.serialized_dag import SerializedDagModel
27+
2628

2729
def get_upstream_assets(
2830
asset_expression: dict, entry_node_ref: str, level: int = 0
@@ -112,3 +114,32 @@ def get_upstream_assets(
112114
edges = edges + e
113115

114116
return nodes, edges
117+
118+
119+
def bind_output_assets_to_tasks(edges: list[dict], serialized_dag: SerializedDagModel) -> None:
120+
"""
121+
Try to bind the downstream assets to the relevant task that produces them.
122+
123+
This function will mutate the `edges` in place.
124+
"""
125+
outlet_asset_references = serialized_dag.dag_model.task_outlet_asset_references
126+
127+
downstream_asset_related_edges = [edge for edge in edges if edge["target_id"].startswith("asset:")]
128+
129+
for edge in downstream_asset_related_edges:
130+
asset_id = int(edge["target_id"].strip("asset:"))
131+
try:
132+
# Try to attach the outlet asset to the relevant task
133+
outlet_asset_reference = next(
134+
outlet_asset_reference
135+
for outlet_asset_reference in outlet_asset_references
136+
if outlet_asset_reference.asset_id == asset_id
137+
)
138+
edge["source_id"] = outlet_asset_reference.task_id
139+
continue
140+
except StopIteration:
141+
# If no asset reference found, fallback to using the exit node reference
142+
# This can happen because asset aliases are not yet handled, they do no populate
143+
# the `outlet_asset_references` when resolved. Extra lookup is needed. Same for asset-name-ref and
144+
# asset-uri-ref.
145+
pass

airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a
369369
{
370370
"is_setup_teardown": None,
371371
"label": None,
372-
"source_id": "task_2",
372+
"source_id": "task_1",
373373
"target_id": f"asset:{asset3_id}",
374374
"is_source_asset": None,
375375
},

0 commit comments

Comments
 (0)