Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@
)
remote_task_handler_kwargs = {}
elif remote_base_log_folder.startswith("gs://"):
from airflow.providers.google.cloud.logs.gcs_task_handler import GCSRemoteLogIO
from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO

key_path = conf.get_mandatory_value("logging", "google_key_path", fallback=None)

Expand Down
30 changes: 30 additions & 0 deletions airflow-core/tests/unit/core/test_logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,36 @@ def test_log_group_arns_remote_logging_with_cloudwatch_handler(
assert isinstance(remote_io, CloudWatchRemoteLogIO)
assert remote_io.log_group_arn == log_group_arn

def test_loading_remote_logging_with_gcs_handler(self):
"""Test if logging can be configured successfully for GCS"""
import airflow.logging_config
from airflow.config_templates import airflow_local_settings
from airflow.providers.google.cloud.log.gcs_task_handler import GCSRemoteLogIO

with conf_vars(
{
("logging", "remote_logging"): "True",
("logging", "remote_log_conn_id"): "some_gcs",
("logging", "remote_base_log_folder"): "gs://some-folder",
("logging", "google_key_path"): "/gcs-key.json",
(
"logging",
"remote_task_handler_kwargs",
): '{"delete_local_copy": true, "project_id": "test-project", "gcp_keyfile_dict": {},"scopes": ["https://www.googleapis.com/auth/devstorage.read_write"]}',
}
):
importlib.reload(airflow_local_settings)
airflow.logging_config.configure_logging()

assert isinstance(airflow.logging_config.REMOTE_TASK_LOG, GCSRemoteLogIO)
assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "delete_local_copy") is True
assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "project_id") == "test-project"
assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "gcp_keyfile_dict") == {}
assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "scopes") == [
"https://www.googleapis.com/auth/devstorage.read_write"
]
assert getattr(airflow.logging_config.REMOTE_TASK_LOG, "gcp_key_path") == "/gcs-key.json"

def test_loading_remote_logging_with_kwargs(self):
"""Test if logging can be configured successfully with kwargs"""
pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider not installed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ class GCSRemoteLogIO(LoggingMixin): # noqa: D101
remote_base: str
base_log_folder: Path = attrs.field(converter=Path)
delete_local_copy: bool
project_id: str

gcp_key_path: str | None
gcp_keyfile_dict: dict | None
scopes: Collection[str] | None
project_id: str

def upload(self, path: os.PathLike, ti: RuntimeTI):
processors = ()

def upload(self, path: os.PathLike | str, ti: RuntimeTI):
"""Upload the given log path to the remote storage."""
path = Path(path)
if path.is_absolute():
Expand Down