Skip to content

Commit 7d1fcfd

Browse files
jx2leejose-lehmkuhl
authored andcommitted
feat (airflowctl): transition of Variable Command (apache#50908)
* init import/export in airflowctl variables * BulkCreateActionVariableBody.action to str
1 parent a7d9f60 commit 7d1fcfd

File tree

5 files changed

+267
-37
lines changed

5 files changed

+267
-37
lines changed

airflow-ctl/docs/images/command_hashes.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ dagrun:7b3e06a3664cc7ceb18457b4c0895532
99
jobs:806174e6c9511db669705279ed6a00b9
1010
pools:2c17a4131b6481bd8fe9120982606db2
1111
providers:d053e6f17ff271e1e08942378344d27b
12-
variables:d9001295d77adefbd68e389f6622b89a
12+
variables:cd3970589b2cb1e3ebd9a0b7f2ffdf4d
1313
version:11da98f530c37754403a87151cbe2274
1414
auth login:348c25d49128b6007ac97dae2ef7563f

airflow-ctl/docs/images/output_variables.svg

Lines changed: 44 additions & 36 deletions
Loading

airflow-ctl/src/airflowctl/ctl/cli_config.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,23 @@ def __call__(self, parser, namespace, values, option_string=None):
198198
action=Password,
199199
nargs="?",
200200
)
201+
ARG_VARIABLE_IMPORT = Arg(
202+
flags=("file",),
203+
metavar="file",
204+
help="Import variables from JSON file",
205+
)
206+
ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg(
207+
flags=("-a", "--action-on-existing-key"),
208+
type=str,
209+
default="overwrite",
210+
help="Action to take if we encounter a variable key that already exists.",
211+
choices=("overwrite", "fail", "skip"),
212+
)
213+
ARG_VARIABLE_EXPORT = Arg(
214+
flags=("file",),
215+
metavar="file",
216+
help="Export all variables to JSON file",
217+
)
201218

202219
ARG_OUTPUT = Arg(
203220
flags=("-o", "--output"),
@@ -626,6 +643,21 @@ def merge_commands(
626643
),
627644
)
628645

646+
VARIABLE_COMMANDS = (
647+
ActionCommand(
648+
name="import",
649+
help="Import variables",
650+
func=lazy_load_command("airflowctl.ctl.commands.variable_command.import_"),
651+
args=(ARG_VARIABLE_IMPORT, ARG_VARIABLE_ACTION_ON_EXISTING_KEY),
652+
),
653+
ActionCommand(
654+
name="export",
655+
help="Export all variables",
656+
func=lazy_load_command("airflowctl.ctl.commands.variable_command.export"),
657+
args=(ARG_VARIABLE_EXPORT,),
658+
),
659+
)
660+
629661
core_commands: list[CLICommand] = [
630662
GroupCommand(
631663
name="auth",
@@ -638,6 +670,11 @@ def merge_commands(
638670
help="Manage Airflow pools",
639671
subcommands=POOL_COMMANDS,
640672
),
673+
GroupCommand(
674+
name="variables",
675+
help="Manage Airflow variables",
676+
subcommands=VARIABLE_COMMANDS,
677+
),
641678
]
642679
# Add generated group commands
643680
core_commands = merge_commands(
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import json
20+
import os
21+
import sys
22+
from pathlib import Path
23+
24+
import rich
25+
26+
from airflow.api_fastapi.core_api.datamodels.common import BulkActionOnExistence
27+
from airflowctl.api.client import NEW_API_CLIENT, ClientKind, provide_api_client
28+
from airflowctl.api.datamodels.generated import (
29+
BulkBodyVariableBody,
30+
BulkCreateActionVariableBody,
31+
VariableBody,
32+
)
33+
34+
35+
@provide_api_client(kind=ClientKind.CLI)
36+
def import_(args, api_client=NEW_API_CLIENT):
37+
"""Import variables from a given file."""
38+
success_message = "[green]Import successful! success: {success}, errors: {errors}[/green]"
39+
if not os.path.exists(args.file):
40+
rich.print(f"[red]Missing variable file: {args.file}")
41+
sys.exit(1)
42+
with open(args.file) as var_file:
43+
try:
44+
var_json = json.load(var_file)
45+
except json.JSONDecodeError:
46+
rich.print(f"[red]Invalid variable file: {args.file}")
47+
sys.exit(1)
48+
49+
action_on_existence = BulkActionOnExistence(args.action_on_existing_key)
50+
vars_to_update = []
51+
for k, v in var_json.items():
52+
value, description = v, None
53+
if isinstance(v, dict) and v.get("value"):
54+
value, description = v["value"], v.get("description")
55+
56+
vars_to_update.append(
57+
VariableBody(
58+
key=k,
59+
value=value,
60+
description=description,
61+
)
62+
)
63+
64+
bulk_body = BulkBodyVariableBody(
65+
actions=[
66+
BulkCreateActionVariableBody(
67+
action="create",
68+
entities=vars_to_update,
69+
action_on_existence=action_on_existence,
70+
)
71+
]
72+
)
73+
result = api_client.variables.bulk(variables=bulk_body)
74+
rich.print(success_message.format(success=result.success, errors=result.errors))
75+
return result.success, result.errors
76+
77+
78+
@provide_api_client(kind=ClientKind.CLI)
79+
def export(args, api_client=NEW_API_CLIENT):
80+
"""Export all the variables to the file."""
81+
success_message = "[green]Export successful! {total_entries} variable(s) to {file}[/green]"
82+
var_dict = {}
83+
variables = api_client.variables.list()
84+
85+
for variable in variables.variables:
86+
if variable.description:
87+
var_dict[variable.key] = {
88+
"value": variable.value,
89+
"description": variable.description,
90+
}
91+
else:
92+
var_dict[variable.key] = variable.value
93+
94+
with open(Path(args.file), "w") as var_file:
95+
json.dump(var_dict, var_file, sort_keys=True, indent=4)
96+
rich.print(success_message.format(total_entries=variables.total_entries, file=args.file))

0 commit comments

Comments
 (0)