Skip to content

Commit 8c86c5e

Browse files
authored
chore: use task_instance as source for all airflow identifiers used in listener (#52339)
1 parent c1c8173 commit 8c86c5e

File tree

3 files changed

+75
-102
lines changed

3 files changed

+75
-102
lines changed

providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def _on_task_instance_running(
147147
"Skipping OpenLineage event emission for task `%s` "
148148
"due to lack of explicit lineage enablement for task or DAG while "
149149
"[openlineage] selective_enable is on.",
150-
task.task_id,
150+
task_instance.task_id,
151151
)
152152
return
153153

@@ -170,14 +170,14 @@ def on_running():
170170
clear_number = dagrun.clear_number
171171

172172
parent_run_id = self.adapter.build_dag_run_id(
173-
dag_id=dag.dag_id,
173+
dag_id=task_instance.dag_id,
174174
logical_date=date,
175175
clear_number=clear_number,
176176
)
177177

178178
task_uuid = self.adapter.build_task_instance_run_id(
179-
dag_id=dag.dag_id,
180-
task_id=task.task_id,
179+
dag_id=task_instance.dag_id,
180+
task_id=task_instance.task_id,
181181
try_number=task_instance.try_number,
182182
logical_date=date,
183183
map_index=task_instance.map_index,
@@ -199,7 +199,7 @@ def on_running():
199199

200200
redacted_event = self.adapter.start_task(
201201
run_id=task_uuid,
202-
job_name=get_job_name(task),
202+
job_name=get_job_name(task_instance),
203203
job_description=dag.description,
204204
event_time=start_date.isoformat(),
205205
nominal_start_time=data_interval_start,
@@ -278,7 +278,7 @@ def _on_task_instance_success(self, task_instance: RuntimeTaskInstance, dag, dag
278278
"Skipping OpenLineage event emission for task `%s` "
279279
"due to lack of explicit lineage enablement for task or DAG while "
280280
"[openlineage] selective_enable is on.",
281-
task.task_id,
281+
task_instance.task_id,
282282
)
283283
return
284284

@@ -289,14 +289,14 @@ def on_success():
289289
date = dagrun.run_after
290290

291291
parent_run_id = self.adapter.build_dag_run_id(
292-
dag_id=dag.dag_id,
292+
dag_id=task_instance.dag_id,
293293
logical_date=date,
294294
clear_number=dagrun.clear_number,
295295
)
296296

297297
task_uuid = self.adapter.build_task_instance_run_id(
298-
dag_id=dag.dag_id,
299-
task_id=task.task_id,
298+
dag_id=task_instance.dag_id,
299+
task_id=task_instance.task_id,
300300
try_number=task_instance.try_number,
301301
logical_date=date,
302302
map_index=task_instance.map_index,
@@ -321,7 +321,7 @@ def on_success():
321321

322322
redacted_event = self.adapter.complete_task(
323323
run_id=task_uuid,
324-
job_name=get_job_name(task),
324+
job_name=get_job_name(task_instance),
325325
end_time=end_date.isoformat(),
326326
task=task_metadata,
327327
# If task owner is default ("airflow"), use DAG owner instead that may have more details
@@ -409,7 +409,7 @@ def _on_task_instance_failed(
409409
"Skipping OpenLineage event emission for task `%s` "
410410
"due to lack of explicit lineage enablement for task or DAG while "
411411
"[openlineage] selective_enable is on.",
412-
task.task_id,
412+
task_instance.task_id,
413413
)
414414
return
415415

@@ -420,14 +420,14 @@ def on_failure():
420420
date = dagrun.run_after
421421

422422
parent_run_id = self.adapter.build_dag_run_id(
423-
dag_id=dag.dag_id,
423+
dag_id=task_instance.dag_id,
424424
logical_date=date,
425425
clear_number=dagrun.clear_number,
426426
)
427427

428428
task_uuid = self.adapter.build_task_instance_run_id(
429-
dag_id=dag.dag_id,
430-
task_id=task.task_id,
429+
dag_id=task_instance.dag_id,
430+
task_id=task_instance.task_id,
431431
try_number=task_instance.try_number,
432432
logical_date=date,
433433
map_index=task_instance.map_index,
@@ -452,7 +452,7 @@ def on_failure():
452452

453453
redacted_event = self.adapter.fail_task(
454454
run_id=task_uuid,
455-
job_name=get_job_name(task),
455+
job_name=get_job_name(task_instance),
456456
end_time=end_date.isoformat(),
457457
task=task_metadata,
458458
error=error,
@@ -489,13 +489,13 @@ def _on_task_instance_manual_state_change(
489489
def on_state_change():
490490
date = dagrun.logical_date or dagrun.run_after
491491
parent_run_id = self.adapter.build_dag_run_id(
492-
dag_id=dagrun.dag_id,
492+
dag_id=ti.dag_id,
493493
logical_date=date,
494494
clear_number=dagrun.clear_number,
495495
)
496496

497497
task_uuid = self.adapter.build_task_instance_run_id(
498-
dag_id=dagrun.dag_id,
498+
dag_id=ti.dag_id,
499499
task_id=ti.task_id,
500500
try_number=ti.try_number,
501501
logical_date=date,
@@ -507,6 +507,10 @@ def on_state_change():
507507
"job_name": get_job_name(ti),
508508
"end_time": end_date.isoformat(),
509509
"task": OperatorLineage(),
510+
"nominal_start_time": None,
511+
"nominal_end_time": None,
512+
"tags": None,
513+
"owners": None,
510514
"run_facets": {
511515
**get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=ti.dag_id),
512516
**get_airflow_debug_facet(),

providers/openlineage/src/airflow/providers/openlineage/utils/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
SecretsMasker,
8080
should_hide_value_for_key,
8181
)
82+
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
8283
from airflow.utils.state import DagRunState, TaskInstanceState
8384
else:
8485
try:
@@ -127,7 +128,7 @@ def get_operator_class(task: BaseOperator) -> type:
127128
return task.__class__
128129

129130

130-
def get_job_name(task: TaskInstance) -> str:
131+
def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str:
131132
return f"{task.dag_id}.{task.task_id}"
132133

133134

0 commit comments

Comments
 (0)