Skip to content

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jul 7, 2025

closes: #52880

Problem

Accessing a connection from SDK (Connection or using context) in airflow 3 and if connection isn't found it raises: AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}

This is the case when pure SDK is used: airflow.sdk.Connection (airflow.models will work fine as we wrap it well)

Airflow 2 raises: airflow.exceptions.AirflowNotFoundException: The conn_id my_conn isn't defined .

Now due to this, some dags / providers use this for various sorts of filtering and for catching exceptions to perform some logic. So long term, this current way of handling exceptions is not feasible. We will start seeing problems when we start migrating providers to Airflow 3 entirely. Example issues: #52838 by amazon team and also #52921 by bosch team.

Testing the solution

DAG used:

from __future__ import annotations

from airflow.models.dag import dag
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Connection


def get_conn_context(**context):
    conn = context["conn"].my_conn
    return conn

def get_conn_sdk(**context):
    return Connection.get("my_conn")


def get_conn_models(**context):
    from airflow.models import Connection
    return Connection.get_connection_from_secrets("my_conn")

@dag()
def get_connection():

    a = PythonOperator(
        task_id="context_conn",
        python_callable=get_conn_context,
    )

    b = PythonOperator(
        task_id="sdk_conn",
        python_callable=get_conn_sdk,
    )

    c = PythonOperator(
        task_id="models_conn",
        python_callable=get_conn_models,
    )

    [a, b, c]


get_connection()

Without the fix:

Accessing from context(task a):

[2025-07-07, 13:41:04] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-07, 13:41:04] INFO - Filling up the DagBag from /files/dags/get-connection.py: source="airflow.models.dagbag.DagBag"
[2025-07-07, 13:41:04] ERROR - Task failed with exception: source="task"
AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run

File "/files/dags/get-connection.py", line 9 in get_conn_context

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 275 in __getattr__

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 158 in _get_connection

Accessing from Models (task c):

[2025-07-07, 13:41:04] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-07, 13:41:04] INFO - Filling up the DagBag from /files/dags/get-connection.py: source="airflow.models.dagbag.DagBag"
[2025-07-07, 13:41:04] ERROR - Task failed with exception: source="task"
AirflowNotFoundException: The conn_id `my_conn` isn't defined
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run

File "/files/dags/get-connection.py", line 18 in get_conn_models

File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 490 in get_connection_from_secrets

AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}
File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 481 in get_connection_from_secrets

File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 152 in get

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 158 in _get_connection

Accessing from SDK:

[2025-07-07, 13:41:04] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-07, 13:41:04] INFO - Filling up the DagBag from /files/dags/get-connection.py: source="airflow.models.dagbag.DagBag"
[2025-07-07, 13:41:04] ERROR - Task failed with exception: source="task"
AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run

File "/files/dags/get-connection.py", line 13 in get_conn_sdk

File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 152 in get

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 158 in _get_connection

After changes:

Accessing from context (task a):

[2025-07-07, 13:51:33] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-07, 13:51:33] INFO - Filling up the DagBag from /files/dags/get-connection.py: source="airflow.models.dagbag.DagBag"
[2025-07-07, 13:51:33] ERROR - Task failed with exception: source="task"
AirflowNotFoundException: The conn_id `my_conn` isn't defined
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run

File "/files/dags/get-connection.py", line 9 in get_conn_context

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 280 in __getattr__

AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 277 in __getattr__

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 159 in _get_connection

Accessing from models (task b):

[2025-07-07, 13:51:33] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-07, 13:51:33] INFO - Filling up the DagBag from /files/dags/get-connection.py: source="airflow.models.dagbag.DagBag"
[2025-07-07, 13:51:33] ERROR - Task failed with exception: source="task"
AirflowNotFoundException: The conn_id `my_conn` isn't defined
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run

File "/files/dags/get-connection.py", line 18 in get_conn_models

File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 481 in get_connection_from_secrets

File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 156 in get

AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 153 in get

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 159 in _get_connection

Accessing from SDK (task c):

[2025-07-07, 13:51:33] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-07-07, 13:51:33] INFO - Filling up the DagBag from /files/dags/get-connection.py: source="airflow.models.dagbag.DagBag"
[2025-07-07, 13:51:33] ERROR - Task failed with exception: source="task"
AirflowNotFoundException: The conn_id `my_conn` isn't defined
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task

File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute

File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run

File "/files/dags/get-connection.py", line 13 in get_conn_sdk

File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 156 in get

AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'my_conn'}
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 153 in get

File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 159 in _get_connection

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Contributor Author

CC @jscheffl

@amoghrajesh amoghrajesh added this to the Airflow 3.1.0 milestone Jul 7, 2025
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same try/except wrapping _get_connection in two places - is there a reason we don't handle inside that fn?

@ashb
Copy link
Member

ashb commented Jul 7, 2025

Shouldn't this be a bug fix too?

@amoghrajesh
Copy link
Contributor Author

We have the same try/except wrapping _get_connection in two places - is there a reason we don't handle inside that fn?

I checked on this and I see the general trend:

  • SDK client raised ErrorResponse with ErrorType for generic client related errors:
class ErrorType(enum.Enum):
    """Error types used in the API client."""

    CONNECTION_NOT_FOUND = "CONNECTION_NOT_FOUND"
    VARIABLE_NOT_FOUND = "VARIABLE_NOT_FOUND"
    XCOM_NOT_FOUND = "XCOM_NOT_FOUND"
    ASSET_NOT_FOUND = "ASSET_NOT_FOUND"
    DAGRUN_ALREADY_EXISTS = "DAGRUN_ALREADY_EXISTS"
    GENERIC_ERROR = "GENERIC_ERROR"
    API_SERVER_ERROR = "API_SERVER_ERROR"
  • These are captured and translated to AirflowRuntimeError in the task SDK runtime functions like connection, variables, dag run already exists etc. I guess this was done for convenience over using AirflowNotFoundException and other types of exceptions over just matching just AirflowRuntimeError?

@amoghrajesh
Copy link
Contributor Author

Shouldn't this be a bug fix too?

Added 3.1 as I thought we are past the 3.0.3 deadline, but I realise we can do 3.0.4, marked it as such.

@amoghrajesh amoghrajesh requested a review from XD-DENG as a code owner July 7, 2025 10:01
@amoghrajesh amoghrajesh requested a review from ashb July 7, 2025 10:02
@potiuk
Copy link
Member

potiuk commented Jul 7, 2025

Shouldn't this be a bug fix too?

Added 3.1 as I thought we are past the 3.0.3 deadline, but I realise we can do 3.0.4, marked it as such.

Also it's quite possible IMHO that we will 3.0.3rc4 - depending on @kaxil 's decision due to #52907

@vincbeck
Copy link
Contributor

vincbeck commented Jul 7, 2025

@ramitkataria , you might want to take a look a this one

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can say this resolves the reported problem - Thanks!

Copy link
Contributor

@ramitkataria ramitkataria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me but I also agree with others about reducing code duplication where possible

@amoghrajesh amoghrajesh requested a review from kaxil July 8, 2025 06:28
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also agree with it being a bugfix if we can still do that

@kaxil
Copy link
Member

kaxil commented Jul 8, 2025

Also agree with it being a bugfix if we can still do that

yup, we can - 3.0.4 :)

@amoghrajesh
Copy link
Contributor Author

@kaxil will merging this PR interfere with the RC-X of 3.0.3? If so, i can hold it. (Although we have the 304 milestone set)

@kaxil kaxil added the backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch label Jul 9, 2025
@kaxil
Copy link
Member

kaxil commented Jul 9, 2025

@kaxil will merging this PR interfere with the RC-X of 3.0.3? If so, i can hold it. (Although we have the 304 milestone set)

Go for it, I will workaround it :)

@amoghrajesh
Copy link
Contributor Author

Cool thanks!

@amoghrajesh amoghrajesh merged commit 5c2a2ec into apache:main Jul 9, 2025
78 checks passed
@amoghrajesh amoghrajesh deleted the better-connection-exceptions branch July 9, 2025 15:29
Copy link

github-actions bot commented Jul 9, 2025

Backport failed to create: v3-0-test. View the failure log Run details

Status Branch Result
v3-0-test Commit Link

You can attempt to backport this manually by running:

cherry_picker 5c2a2ec v3-0-test

This should apply the commit to the v3-0-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

amoghrajesh added a commit to astronomer/airflow that referenced this pull request Jul 9, 2025
@amoghrajesh
Copy link
Contributor Author

Manual cherry pick here: #53093

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:task-sdk backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Do not return different exceptions when connections arent found for AF3 and AF2
8 participants