Skip to content

Commit 33cf422

Browse files
amoghrajeshkaxil
authored andcommitted
[v3-0-test] Unify connection not found exceptions between AF2 and AF3 (#52968) (#53093)
1 parent 9359b88 commit 33cf422

File tree

5 files changed

+23
-8
lines changed

5 files changed

+23
-8
lines changed

airflow-core/src/airflow/models/connection.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,7 @@ def get_connection_from_secrets(cls, conn_id: str) -> Connection:
478478
return conn
479479
except AirflowRuntimeError as e:
480480
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
481-
log.debug("Unable to retrieve connection from MetastoreBackend using Task SDK")
482-
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
481+
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") from None
483482
raise
484483

485484
# check cache first

airflow-core/tests/unit/dag_processing/test_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def dag_in_a_fn():
324324
assert result is not None
325325
assert result.import_errors != {}
326326
if result.import_errors:
327-
assert "CONNECTION_NOT_FOUND" in next(iter(result.import_errors.values()))
327+
assert "The conn_id `my_conn` isn't defined" in next(iter(result.import_errors.values()))
328328

329329
def test_import_module_in_bundle_root(self, tmp_path: pathlib.Path, inprocess_client):
330330
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")

task-sdk/src/airflow/sdk/definitions/connection.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424

2525
import attrs
2626

27-
from airflow.exceptions import AirflowException
27+
from airflow.exceptions import AirflowException, AirflowNotFoundException
28+
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
2829

2930
log = logging.getLogger(__name__)
3031

@@ -139,7 +140,12 @@ def get_hook(self, *, hook_params=None):
139140
def get(cls, conn_id: str) -> Any:
140141
from airflow.sdk.execution_time.context import _get_connection
141142

142-
return _get_connection(conn_id)
143+
try:
144+
return _get_connection(conn_id)
145+
except AirflowRuntimeError as e:
146+
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
147+
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") from None
148+
raise
143149

144150
@property
145151
def extra_dejson(self) -> dict:

task-sdk/src/airflow/sdk/execution_time/context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,9 @@ class ConnectionAccessor:
272272
"""Wrapper to access Connection entries in template."""
273273

274274
def __getattr__(self, conn_id: str) -> Any:
275-
return _get_connection(conn_id)
275+
from airflow.sdk.definitions.connection import Connection
276+
277+
return Connection.get(conn_id)
276278

277279
def __repr__(self) -> str:
278280
return "<ConnectionAccessor (dynamic access)>"

task-sdk/tests/task_sdk/definitions/test_connections.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
import pytest
2424

2525
from airflow.configuration import initialize_secrets_backends
26-
from airflow.exceptions import AirflowException
26+
from airflow.exceptions import AirflowException, AirflowNotFoundException
2727
from airflow.sdk import Connection
28-
from airflow.sdk.execution_time.comms import ConnectionResult
28+
from airflow.sdk.exceptions import ErrorType
29+
from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse
2930
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
3031

3132
from tests_common.test_utils.config import conf_vars
@@ -121,6 +122,13 @@ def test_conn_get(self, mock_supervisor_comms):
121122
extra=None,
122123
)
123124

125+
def test_conn_get_not_found(self, mock_supervisor_comms):
126+
error_response = ErrorResponse(error=ErrorType.CONNECTION_NOT_FOUND)
127+
mock_supervisor_comms.send.return_value = error_response
128+
129+
with pytest.raises(AirflowNotFoundException, match="The conn_id `mysql_conn` isn't defined"):
130+
_ = Connection.get(conn_id="mysql_conn")
131+
124132

125133
class TestConnectionsFromSecrets:
126134
def test_get_connection_secrets_backend(self, mock_supervisor_comms, tmp_path):

0 commit comments

Comments
 (0)