diff --git a/airflow-core/src/airflow/executors/executor_loader.py b/airflow-core/src/airflow/executors/executor_loader.py index 3b637d85f147f..49c40e94389ed 100644 --- a/airflow-core/src/airflow/executors/executor_loader.py +++ b/airflow-core/src/airflow/executors/executor_loader.py @@ -164,7 +164,7 @@ def _get_team_executor_configs(cls) -> list[tuple[str | None, list[str]]]: executor_config = conf.get_mandatory_value("core", "executor") if not executor_config: raise AirflowConfigException( - "The 'executor' key in the 'coe' section of the configuration is mandatory and cannot be empty" + "The 'executor' key in the 'core' section of the configuration is mandatory and cannot be empty" ) configs: list[tuple[str | None, list[str]]] = [] # The executor_config can look like a few things. One is just a single executor name, such as @@ -182,6 +182,7 @@ def _get_team_executor_configs(cls) -> list[tuple[str | None, list[str]]]: # Split by comma to get the individual executor names and strip spaces off of them configs.append((None, [name.strip() for name in team_executor_config.split(",")])) else: + cls.block_use_of_multi_team() team_name, executor_names = team_executor_config.split("=") configs.append((team_name, [name.strip() for name in executor_names.split(",")])) return configs diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py b/airflow-core/tests/unit/executors/test_executor_loader.py index e49db7d609bb1..d3495752880e7 100644 --- a/airflow-core/tests/unit/executors/test_executor_loader.py +++ b/airflow-core/tests/unit/executors/test_executor_loader.py @@ -48,7 +48,7 @@ def test_empty_executor_configured(self): with conf_vars({("core", "executor"): ""}): with pytest.raises( AirflowConfigException, - match="The 'executor' key in the 'coe' section of the configuration is mandatory and cannot be empty", + match="The 'executor' key in the 'core' section of the configuration is mandatory and cannot be empty", ): executor_loader.ExecutorLoader.get_default_executor() @@ -245,9 +245,20 @@ def test_should_support_custom_path(self): ], ) def test_get_hybrid_executors_from_configs(self, executor_config, expected_executors_list): - with conf_vars({("core", "executor"): executor_config}): - executors = executor_loader.ExecutorLoader._get_executor_names() - assert executors == expected_executors_list + # Mock the blocking method for tests that involve actual team configurations + with mock.patch.object(executor_loader.ExecutorLoader, "block_use_of_multi_team"): + with conf_vars({("core", "executor"): executor_config}): + executors = executor_loader.ExecutorLoader._get_executor_names() + assert executors == expected_executors_list + + def test_get_multi_team_executors_from_config_blocked_by_default(self): + """By default the use of multiple team based executors is blocked for now.""" + with conf_vars({("core", "executor"): "=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor"}): + with pytest.raises( + AirflowConfigException, + match=r".*Configuring multiple team based executors is not yet supported!.*", + ): + executor_loader.ExecutorLoader._get_executor_names() def test_init_executors(self): from airflow.providers.celery.executors.celery_executor import CeleryExecutor diff --git a/devel-common/src/tests_common/test_utils/executor_loader.py b/devel-common/src/tests_common/test_utils/executor_loader.py index f7dd98b726428..6374b381e1df5 100644 --- a/devel-common/src/tests_common/test_utils/executor_loader.py +++ b/devel-common/src/tests_common/test_utils/executor_loader.py @@ -29,6 +29,6 @@ def clean_executor_loader_module(): """Clean the executor_loader state, as it stores global variables in the module, causing side effects for some tests.""" executor_loader._alias_to_executors: dict[str, ExecutorName] = {} executor_loader._module_to_executors: dict[str, ExecutorName] = {} - executor_loader._team_id_to_executors: dict[str | None, ExecutorName] = {} + executor_loader._team_name_to_executors: dict[str | None, ExecutorName] = {} executor_loader._classname_to_executors: dict[str, ExecutorName] = {} executor_loader._executor_names: list[ExecutorName] = []