Skip to content

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Jul 9, 2025

Until #44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.

Ran a example DAG:

import time

from airflow.sdk import dag
from airflow.providers.standard.operators.python import PythonOperator
@dag
def my_dag():

    def fail_cb(context):
        print("From inside the callback")
        print(context)

    def n_s():
        time.sleep(3000)

    t = PythonOperator(
        task_id="my_task",
        python_callable=n_s,
        on_failure_callback=fail_cb
    )

    t

my_dag()

DAG Processor Logs after I killed the Supervisor & task process when the task is running:

{"timestamp":"2025-07-08T22:48:31.568171Z","level":"info","event":"{'dag': <DAG: my_dag>, 'inlets': [], 'map_index_template': None, 'outlets': [], 'run_id': 'manual__2025-07-08T22:44:37.346022+00:00', 'task': <Task(PythonOperator): my_task>, 'task_instance': RuntimeTaskInstance(id=UUID('0197ec36-656a-75b7-814a-c409886945e2'), task_id='my_task', dag_id='my_dag', run_id='manual__2025-07-08T22:44:37.346022+00:00', try_number=2, dag_version_id=UUID('0197ec0c-669e-7e05-80e3-97c3549dbfda'), map_index=-1, hostname='c9bd6c415dbf', context_carrier={}, task=<Task(PythonOperator): my_task>, max_tries=0, end_date=None, state=None, is_mapped=None, rendered_map_index=None, log_url=None), 'ti': RuntimeTaskInstance(id=UUID('0197ec36-656a-75b7-814a-c409886945e2'), task_id='my_task', dag_id='my_dag', run_id='manual__2025-07-08T22:44:37.346022+00:00', try_number=2, dag_version_id=UUID('0197ec0c-669e-7e05-80e3-97c3549dbfda'), map_index=-1, hostname='c9bd6c415dbf', context_carrier={}, task=<Task(PythonOperator): my_task>, max_tries=0, end_date=None, state=None, is_mapped=None, rendered_map_index=None, log_url=None), 'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors object at 0xffff6cb48880>, 'inlet_events': InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={}), 'macros': <MacrosAccessor (dynamic access to macros)>, 'params': {}, 'var': {'json': <VariableAccessor (dynamic access)>, 'value': <VariableAccessor (dynamic access)>}, 'conn': <ConnectionAccessor (dynamic access)>, 'dag_run': DagRun(dag_id='my_dag', run_id='manual__2025-07-08T22:44:37.346022+00:00', logical_date=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), data_interval_start=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), data_interval_end=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), run_after=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), start_date=datetime.datetime(2025, 7, 8, 22, 44, 44, 227565, tzinfo=Timezone('UTC')), end_date=None, clear_number=0, run_type=<DagRunType.MANUAL: 'manual'>, conf={}, consumed_asset_events=[]), 'triggering_asset_events': TriggeringAssetEventsAccessor(_events=defaultdict(<class 'list'>, {})), 'task_instance_key_str': 'my_dag__my_task__20250708', 'task_reschedule_count': 0, 'prev_start_date_success': <Proxy at 0xffff6cb48cd0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c494280>>, 'prev_end_date_success': <Proxy at 0xffff6cb48d30 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c8e5120>>, 'logical_date': DateTime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), 'ds': '2025-07-08', 'ds_nodash': '20250708', 'ts': '2025-07-08T22:44:35.842000+00:00', 'ts_nodash': '20250708T224435', 'ts_nodash_with_tz': '20250708T224435.842000+0000', 'data_interval_end': DateTime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), 'prev_data_interval_start_success': <Proxy at 0xffff6cb48dc0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c8e51b0>>, 'prev_data_interval_end_success': <Proxy at 0xffff6cb48df0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c8e5240>>}","chan":"stdout","logger":"processor"}
{"timestamp":"2025-07-08T22:48:31.568235Z","level":"info","event":"From inside the callback","chan":"stdout","logger":"processor"}
{"timestamp":"2025-07-08T22:48:31.572041Z","level":"info","event":"{'dag': <DAG: my_dag>, 'inlets': [], 'map_index_template': None, 'outlets': [], 'run_id': 'manual__2025-07-08T22:44:37.346022+00:00', 'task': <Task(PythonOperator): my_task>, 'task_instance': RuntimeTaskInstance(id=UUID('0197ec36-656a-75b7-814a-c409886945e2'), task_id='my_task', dag_id='my_dag', run_id='manual__2025-07-08T22:44:37.346022+00:00', try_number=2, dag_version_id=UUID('0197ec0c-669e-7e05-80e3-97c3549dbfda'), map_index=-1, hostname='c9bd6c415dbf', context_carrier={}, task=<Task(PythonOperator): my_task>, max_tries=0, end_date=None, state=None, is_mapped=None, rendered_map_index=None, log_url=None), 'ti': RuntimeTaskInstance(id=UUID('0197ec36-656a-75b7-814a-c409886945e2'), task_id='my_task', dag_id='my_dag', run_id='manual__2025-07-08T22:44:37.346022+00:00', try_number=2, dag_version_id=UUID('0197ec0c-669e-7e05-80e3-97c3549dbfda'), map_index=-1, hostname='c9bd6c415dbf', context_carrier={}, task=<Task(PythonOperator): my_task>, max_tries=0, end_date=None, state=None, is_mapped=None, rendered_map_index=None, log_url=None), 'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors object at 0xffff6cb48e20>, 'inlet_events': InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={}), 'macros': <MacrosAccessor (dynamic access to macros)>, 'params': {}, 'var': {'json': <VariableAccessor (dynamic access)>, 'value': <VariableAccessor (dynamic access)>}, 'conn': <ConnectionAccessor (dynamic access)>, 'dag_run': DagRun(dag_id='my_dag', run_id='manual__2025-07-08T22:44:37.346022+00:00', logical_date=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), data_interval_start=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), data_interval_end=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), run_after=datetime.datetime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), start_date=datetime.datetime(2025, 7, 8, 22, 44, 44, 227565, tzinfo=Timezone('UTC')), end_date=None, clear_number=0, run_type=<DagRunType.MANUAL: 'manual'>, conf={}, consumed_asset_events=[]), 'triggering_asset_events': TriggeringAssetEventsAccessor(_events=defaultdict(<class 'list'>, {})), 'task_instance_key_str': 'my_dag__my_task__20250708', 'task_reschedule_count': 0, 'prev_start_date_success': <Proxy at 0xffff6cb48ca0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c494280>>, 'prev_end_date_success': <Proxy at 0xffff6cb48c70 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c8e52d0>>, 'logical_date': DateTime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), 'ds': '2025-07-08', 'ds_nodash': '20250708', 'ts': '2025-07-08T22:44:35.842000+00:00', 'ts_nodash': '20250708T224435', 'ts_nodash_with_tz': '20250708T224435.842000+0000', 'data_interval_end': DateTime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2025, 7, 8, 22, 44, 35, 842000, tzinfo=Timezone('UTC')), 'prev_data_interval_start_success': <Proxy at 0xffff6cb488e0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c8e5240>>, 'prev_data_interval_end_success': <Proxy at 0xffff6cb48880 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff6c8e51b0>>}","chan":"stdout","logger":"processor"}

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:DAG-processing area:Scheduler including HA (high availability) scheduler area:task-sdk labels Jul 9, 2025
@kaxil kaxil added this to the Airflow 3.0.4 milestone Jul 9, 2025
@amoghrajesh amoghrajesh changed the title Run Task failure callbacks on DAG Processor when task is externally k… Run Task failure callbacks on DAG Processor when task is externally killed Jul 9, 2025
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits, looks good otherwise.

@kaxil kaxil force-pushed the task-level-callback branch from fb0fa28 to 9d562d0 Compare July 9, 2025 10:24
…illed

Until apache#44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- apache#44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

apache#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.
@kaxil kaxil force-pushed the task-level-callback branch from 9d562d0 to 2f67859 Compare July 9, 2025 10:28
@kaxil kaxil added the backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch label Jul 9, 2025
@kaxil kaxil merged commit a5211f2 into apache:main Jul 9, 2025
102 checks passed
@kaxil kaxil deleted the task-level-callback branch July 9, 2025 12:51
Copy link

github-actions bot commented Jul 9, 2025

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker a5211f2 v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

kaxil added a commit to astronomer/airflow that referenced this pull request Jul 10, 2025
…illed (apache#53058)

Until apache#44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- apache#44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

apache#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.

(cherry-picked from a5211f2)
@kaxil
Copy link
Member Author

kaxil commented Jul 10, 2025

Backport PR: #53143

HsiuChuanHsu pushed a commit to HsiuChuanHsu/airflow that referenced this pull request Jul 10, 2025
…illed (apache#53058)

Until apache#44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- apache#44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

apache#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.
kaxil added a commit that referenced this pull request Jul 10, 2025
…illed (#53058) (#53143)

Until #44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- #44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.

(cherry-picked from a5211f2)
kaxil added a commit that referenced this pull request Jul 11, 2025
…illed (#53058) (#53143)

Until #44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- #44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.

(cherry-picked from a5211f2)
kaxil added a commit that referenced this pull request Jul 11, 2025
…illed (#53058) (#53143)

Until #44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- #44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.

(cherry-picked from a5211f2)
stephen-bracken pushed a commit to stephen-bracken/airflow that referenced this pull request Jul 15, 2025
…illed (apache#53058)

Until apache#44354 is implemented, tasks killed externally or when supervisor process dies unexpectedly, users have no way of knowing this happened.

This has been a blocker for Airflow 3.0 adoption for some:

- apache#44354
- https://apache-airflow.slack.com/archives/C07813CNKA8/p1751057525231389

apache#44354 is more involved and we might not get to it for Airflow 3.1 -- so this is a good fix until then similar to how we run Dag Run callback.
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 23, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 23, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 23, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 24, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
kaxil added a commit to astronomer/airflow that referenced this pull request Jul 24, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
kaxil added a commit that referenced this pull request Jul 24, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to #53058

Fixes #52824
Fixes #51402
Closes #51949
Related to #53654
Related to #53618
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
kaxil added a commit that referenced this pull request Aug 9, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to #53058

Fixes #52824
Fixes #51402
Closes #51949
Related to #53654
Related to #53618

(cherry picked from commit ef80507)
fweilun pushed a commit to fweilun/airflow that referenced this pull request Aug 11, 2025
This ensures DAG callbacks receive the same rich context as task callbacks,
improving consistency and providing access to template variables and macros similar to Airflow 2.

This has been a blocker for few users similar to apache#53058

Fixes apache#52824
Fixes apache#51402
Closes apache#51949
Related to apache#53654
Related to apache#53618
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:DAG-processing area:Scheduler including HA (high availability) scheduler area:task-sdk backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants