Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,7 @@ repos:
^airflow-core/src/airflow/api/common/mark_tasks\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/connections\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/services/public/connections\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/datamodels/variables\.py$|
^airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid\.py$|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

import json

from fastapi import HTTPException, status
from pydantic import ValidationError
from sqlalchemy import select
Expand All @@ -32,6 +34,7 @@
from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.models.connection import Connection
from airflow.sdk.execution_time.secrets_masker import merge


def update_orm_from_pydantic(
Expand All @@ -56,11 +59,23 @@ def update_orm_from_pydantic(
if (not update_mask and "password" in pydantic_conn.model_fields_set) or (
update_mask and "password" in update_mask
):
orm_conn.set_password(pydantic_conn.password)
if pydantic_conn.password is None:
orm_conn.set_password(pydantic_conn.password)
else:
merged_password = merge(pydantic_conn.password, orm_conn.password, "password")
orm_conn.set_password(merged_password)
if (not update_mask and "extra" in pydantic_conn.model_fields_set) or (
update_mask and "extra" in update_mask
):
orm_conn.set_extra(pydantic_conn.extra)
if pydantic_conn.extra is None or orm_conn.extra is None:
orm_conn.set_extra(pydantic_conn.extra)
return
try:
merged_extra = merge(json.loads(pydantic_conn.extra), json.loads(orm_conn.extra))
orm_conn.set_extra(json.dumps(merged_extra))
except json.JSONDecodeError:
# We can't merge fields in an unstructured `extra`
orm_conn.set_extra(pydantic_conn.extra)


class BulkConnectionService(BulkService[ConnectionBody]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"extraFields": "Extra Fields",
"extraFieldsJson": "Extra Fields JSON",
"helperText": "Connection type missing? Make sure you have installed the corresponding Airflow Providers Package.",
"helperTextForRedactedFields": "Redacted fields ('***') will remain unchanged if not modified.",
"selectConnectionType": "Select Connection Type",
"standardFields": "Standard Fields"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ export type FlexibleFormProps = {
initialParamsDict: { paramsDict: ParamsSpec };
key?: string;
setError: (error: boolean) => void;
subHeader?: string;
};

export const FlexibleForm = ({
flexibleFormDefaultSection,
initialParamsDict,
setError,
subHeader,
}: FlexibleFormProps) => {
const { paramsDict: params, setInitialParamDict, setParamsDict } = useParamStore();
const processedSections = new Map();
Expand Down Expand Up @@ -126,6 +128,11 @@ export const FlexibleForm = ({

<Accordion.ItemContent pt={0}>
<Accordion.ItemBody>
{Boolean(subHeader) ? (
<Text color="fg.muted" fontSize="xs" mb={2}>
{subHeader}
</Text>
) : undefined}
<Stack separator={<StackSeparator />}>
{Object.entries(params)
.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ import type { ConnectionBody } from "./Connections";
type AddConnectionFormProps = {
readonly error: unknown;
readonly initialConnection: ConnectionBody;
readonly isEditMode?: boolean;
readonly isPending: boolean;
readonly mutateConnection: (requestBody: ConnectionBody) => void;
};

const ConnectionForm = ({
error,
initialConnection,
isEditMode = false,
isPending,
mutateConnection,
}: AddConnectionFormProps) => {
Expand Down Expand Up @@ -202,6 +204,7 @@ const ConnectionForm = ({
initialParamsDict={paramsDic}
key={selectedConnType}
setError={setFormErrors}
subHeader={isEditMode ? translate("connections.form.helperTextForRedactedFields") : undefined}
/>
<Accordion.Item key="extraJson" value="extraJson">
<Accordion.ItemTrigger cursor="button">
Expand All @@ -220,6 +223,11 @@ const ConnectionForm = ({
}}
/>
{Boolean(errors.conf) ? <Field.ErrorText>{errors.conf}</Field.ErrorText> : undefined}
{isEditMode ? (
<Field.HelperText>
{translate("connections.form.helperTextForRedactedFields")}
</Field.HelperText>
) : undefined}
</Field.Root>
)}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const EditConnectionButton = ({ connection, disabled }: Props) => {
<ConnectionForm
error={error}
initialConnection={initialConnectionValue}
isEditMode
isPending={isPending}
mutateConnection={editConnection}
/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,27 @@ class TestPatchConnection(TestConnectionEndpoint):
"schema": None,
},
),
(
# Sensitive "***" should be ignored.
{
"connection_id": TEST_CONN_ID,
"conn_type": TEST_CONN_TYPE,
"port": 80,
"login": "test_login_patch",
"password": "***",
},
{
"conn_type": TEST_CONN_TYPE,
"connection_id": TEST_CONN_ID,
"description": TEST_CONN_DESCRIPTION,
"extra": None,
"host": TEST_CONN_HOST,
"login": "test_login_patch",
"password": None,
"port": 80,
"schema": None,
},
),
(
{
"connection_id": TEST_CONN_ID,
Expand Down
119 changes: 118 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from enum import Enum
from functools import cache, cached_property
from re import Pattern
from typing import Any, TextIO, TypeAlias, TypeVar
from typing import Any, TextIO, TypeAlias, TypeVar, overload

from airflow import settings

Expand Down Expand Up @@ -116,6 +116,27 @@ def redact(value: Redactable, name: str | None = None, max_depth: int | None = N
return _secrets_masker().redact(value, name, max_depth)


@overload
def merge(new_value: str, old_value: str, name: str | None = None, max_depth: int | None = None) -> str: ...


@overload
def merge(new_value: dict, old_value: dict, name: str | None = None, max_depth: int | None = None) -> str: ...


def merge(
new_value: Redacted, old_value: Redactable, name: str | None = None, max_depth: int | None = None
) -> Redacted:
"""
Merge a redacted value with its original unredacted counterpart.

Takes a user-modified redacted value and merges it with the original unredacted value.
For sensitive fields that still contain "***" (unchanged), the original value is restored.
For fields that have been updated by the user, the new value is preserved.
"""
return _secrets_masker().merge(new_value, old_value, name, max_depth)


@cache
def _secrets_masker() -> SecretsMasker:
for flt in logging.getLogger("airflow.task").filters:
Expand Down Expand Up @@ -292,6 +313,83 @@ def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int
)
return item

def _merge(
self,
new_item: Redacted,
old_item: Redactable,
name: str | None,
depth: int,
max_depth: int,
force_sensitive: bool = False,
) -> Redacted:
"""Merge a redacted item with its original unredacted counterpart."""
if depth > max_depth:
if isinstance(new_item, str) and new_item == "***":
return old_item
return new_item

try:
# Determine if we should treat this as sensitive
is_sensitive = force_sensitive or (name is not None and should_hide_value_for_key(name))

if isinstance(new_item, dict) and isinstance(old_item, dict):
merged = {}
for key in new_item.keys():
if key in old_item:
# For dicts, pass the key as name unless we're in sensitive mode
child_name = None if is_sensitive else key
merged[key] = self._merge(
new_item[key],
old_item[key],
name=child_name,
depth=depth + 1,
max_depth=max_depth,
force_sensitive=is_sensitive,
)
else:
merged[key] = new_item[key]
return merged

if isinstance(new_item, (list, tuple)) and type(old_item) is type(new_item):
merged_list = []
for i in range(len(new_item)):
if i < len(old_item):
# In sensitive mode, check if individual item is redacted
if is_sensitive and isinstance(new_item[i], str) and new_item[i] == "***":
merged_list.append(old_item[i])
else:
merged_list.append(
self._merge(
new_item[i],
old_item[i],
name=None,
depth=depth + 1,
max_depth=max_depth,
force_sensitive=is_sensitive,
)
)
else:
merged_list.append(new_item[i])

if isinstance(new_item, list):
return list(merged_list)
return tuple(merged_list)

if isinstance(new_item, set) and isinstance(old_item, set):
# Sets are unordered, we cannot restore original items.
return new_item

if _is_v1_env_var(new_item) and _is_v1_env_var(old_item):
# TODO: Handle Kubernetes V1EnvVar objects if needed
return new_item

if is_sensitive and isinstance(new_item, str) and new_item == "***":
return old_item
return new_item

except (TypeError, AttributeError, ValueError):
return new_item

def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
"""
Redact an any secrets found in ``item``, if it is a string.
Expand All @@ -302,6 +400,25 @@ def redact(self, item: Redactable, name: str | None = None, max_depth: int | Non
"""
return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH)

def merge(
self, new_item: Redacted, old_item: Redactable, name: str | None = None, max_depth: int | None = None
) -> Redacted:
"""
Merge a redacted item with its original unredacted counterpart.

Takes a user-modified redacted item and merges it with the original unredacted item.
For sensitive fields that still contain "***" (unchanged), the original value is restored.
For fields that have been updated, the new value is preserved.
"""
return self._merge(
new_item,
old_item,
name=name,
depth=0,
max_depth=max_depth or self.MAX_RECURSION_DEPTH,
force_sensitive=False,
)

@cached_property
def _mask_adapter(self) -> None | Callable:
"""
Expand Down
Loading