From 29058f3ebca268d0df6347b8a3715e36f227289f Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 25 Apr 2025 15:03:32 +0100 Subject: [PATCH 1/5] Fix gcp remote log module import in airflow local settings --- .../airflow_local_settings.py | 5 ++-- .../tests/unit/core/test_logging_config.py | 30 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 319cdc972593c..2123e12bce6ed 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -147,6 +147,7 @@ f"{type(remote_task_handler_kwargs)}" ) delete_local_copy = conf.getboolean("logging", "delete_local_logs") + print(delete_local_copy) if remote_base_log_folder.startswith("s3://"): from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO @@ -180,7 +181,7 @@ ) remote_task_handler_kwargs = {} elif remote_base_log_folder.startswith("gs://"): - from airflow.providers.google.cloud.logs.gcs_task_handler import GCSRemoteLogIO + from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None) @@ -194,7 +195,7 @@ } | remote_task_handler_kwargs ) - ) + ) # type: ignore[assignment] remote_task_handler_kwargs = {} elif remote_base_log_folder.startswith("wasb"): from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO diff --git a/airflow-core/tests/unit/core/test_logging_config.py b/airflow-core/tests/unit/core/test_logging_config.py index b19e7d9bbe7d6..72864b066b34a 100644 --- a/airflow-core/tests/unit/core/test_logging_config.py +++ b/airflow-core/tests/unit/core/test_logging_config.py @@ -295,6 +295,36 @@ def test_log_group_arns_remote_logging_with_cloudwatch_handler( assert isinstance(remote_io, CloudWatchRemoteLogIO) assert remote_io.log_group_arn == log_group_arn + def test_loading_remote_logging_with_gcs_handler(self): + """Test if logging can be configured successfully for GCS""" + import airflow.logging_config + from airflow.config_templates import airflow_local_settings + from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO + + with conf_vars( + { + ("logging", "remote_logging"): "True", + ("logging", "remote_log_conn_id"): "some_gcs", + ("logging", "remote_base_log_folder"): "gs://some-folder", + ("logging", "google_key_path"): "/gcs-key.json", + ( + "logging", + "remote_task_handler_kwargs", + ): '{"delete_local_copy": true, "project_id": "test-project", "gcp_keyfile_dict": {},"scopes": ["https://www.googleapis.com/auth/devstorage.read_write"]}', + } + ): + importlib.reload(airflow_local_settings) + airflow.logging_config.configure_logging() + + assert isinstance(airflow.logging_config.REMOTE_TASK_LOG, GCSRemoteLogIO) + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "delete_local_copy") is True + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "project_id") == "test-project" + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "gcp_keyfile_dict") == {} + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "scopes") == [ + "https://www.googleapis.com/auth/devstorage.read_write" + ] + assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "gcp_key_path") == "/gcs-key.json" + def test_loading_remote_logging_with_kwargs(self): """Test if logging can be configured successfully with kwargs""" pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider not installed") From 0e738cbcf1a4d0c462a4836e358622b398c4da8c Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 25 Apr 2025 15:07:23 +0100 Subject: [PATCH 2/5] Fix gcp remote log module import in airflow local settings --- .../src/airflow/config_templates/airflow_local_settings.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 2123e12bce6ed..d314d91820c76 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -147,7 +147,6 @@ f"{type(remote_task_handler_kwargs)}" ) delete_local_copy = conf.getboolean("logging", "delete_local_logs") - print(delete_local_copy) if remote_base_log_folder.startswith("s3://"): from airflow.providers.amazon.aws.log.s3_task_handler import S3RemoteLogIO From d21c18d315084aed0ac44ab5687759eae1affa58 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 25 Apr 2025 16:29:57 +0100 Subject: [PATCH 3/5] Remove assignment ignore --- .../src/airflow/config_templates/airflow_local_settings.py | 2 +- .../airflow/providers/google/cloud/log/gcs_task_handler.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index d314d91820c76..3cade5678261c 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -194,7 +194,7 @@ } | remote_task_handler_kwargs ) - ) # type: ignore[assignment] + ) remote_task_handler_kwargs = {} elif remote_base_log_folder.startswith("wasb"): from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbRemoteLogIO diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index c598e02ba4a30..776d5d5290d5f 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -61,11 +61,13 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101 remote_base: str base_log_folder: Path = attrs.field(converter=Path) delete_local_copy: bool + project_id: str gcp_key_path: str | None gcp_keyfile_dict: dict | None scopes: Collection[str] | None - project_id: str + + processors = () def upload(self, path: os.PathLike, ti: RuntimeTI): """Upload the given log path to the remote storage.""" From d45e8e9d4054a19487acbbf36fb52446564d60c9 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 25 Apr 2025 16:52:37 +0100 Subject: [PATCH 4/5] Remove assignment ignore --- .../src/airflow/config_templates/airflow_local_settings.py | 1 + .../src/airflow/providers/google/cloud/log/gcs_task_handler.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 3cade5678261c..e1038c355d91f 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -190,6 +190,7 @@ "base_log_folder": BASE_LOG_FOLDER, "remote_base": remote_base_log_folder, "delete_local_copy": delete_local_copy, + "gcp_key_path": key_path, } | remote_task_handler_kwargs diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index 776d5d5290d5f..da422b445d029 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -56,7 +56,7 @@ logger = logging.getLogger(__name__) -@attrs.define +@attrs.define(kw_only=True) class GCSRemoteLogIO(LoggingMixin): # noqa: D101 remote_base: str base_log_folder: Path = attrs.field(converter=Path) From 1e92e5a7091ef52d4e732fa43cbfa6ef651fb320 Mon Sep 17 00:00:00 2001 From: Pavan Kumar Date: Fri, 25 Apr 2025 17:10:27 +0100 Subject: [PATCH 5/5] Fix mypy --- .../src/airflow/config_templates/airflow_local_settings.py | 1 - .../airflow/providers/google/cloud/log/gcs_task_handler.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index e1038c355d91f..3cade5678261c 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -190,7 +190,6 @@ "base_log_folder": BASE_LOG_FOLDER, "remote_base": remote_base_log_folder, "delete_local_copy": delete_local_copy, - "gcp_key_path": key_path, } | remote_task_handler_kwargs diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index da422b445d029..249cb80aa852f 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -56,7 +56,7 @@ logger = logging.getLogger(__name__) -@attrs.define(kw_only=True) +@attrs.define class GCSRemoteLogIO(LoggingMixin): # noqa: D101 remote_base: str base_log_folder: Path = attrs.field(converter=Path) @@ -69,7 +69,7 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101 processors = () - def upload(self, path: os.PathLike, ti: RuntimeTI): + def upload(self, path: os.PathLike | str, ti: RuntimeTI): """Upload the given log path to the remote storage.""" path = Path(path) if path.is_absolute():