Skip to content

Commit 8f99e25

Browse files
insomneskaxil
authored andcommitted
Add @task.kuberenetes_cmd (#46913)
closes: #46414 (cherry picked from commit 75140f6)
1 parent 95adaec commit 8f99e25

File tree

9 files changed

+1165
-285
lines changed

9 files changed

+1165
-285
lines changed

providers/cncf/kubernetes/docs/operators.rst

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,40 @@ Also for this action you can use operator in the deferrable mode:
182182
:start-after: [START howto_operator_k8s_write_xcom_async]
183183
:end-before: [END howto_operator_k8s_write_xcom_async]
184184

185+
186+
Run command in KubernetesPodOperator from TaskFlow
187+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
188+
With the usage of the ``@task.kubernetes_cmd`` decorator, you can run a command returned by a function
189+
in a ``KubernetesPodOperator`` simplifying it's connection to the TaskFlow.
190+
191+
Difference between ``@task.kubernetes`` and ``@task.kubernetes_cmd``
192+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
193+
``@task.kubernetes`` decorator is designed to run a Python function inside a Kubernetes pod using KPO.
194+
It does this by serializing the function into a temporary Python script that is executed inside the container.
195+
This is well-suited for cases where you want to isolate Python code execution and manage complex dependencies,
196+
as described in the :doc:`TaskFlow documentation <apache-airflow:tutorial/taskflow>`.
197+
198+
In contrast, ``@task.kubernetes_cmd`` decorator allows the decorated function to return
199+
a shell command (as a list of strings), which is then passed as cmds or arguments to
200+
``KubernetesPodOperator``.
201+
This enables executing arbitrary commands available inside a Kubernetes pod --
202+
without needing to wrap it in Python code.
203+
204+
A key benefit here is that Python excels at composing and templating these commands.
205+
Shell commands can be dynamically generated using Python's string formatting, templating,
206+
extra function calls and logic. This makes it a flexible tool for orchestrating complex pipelines
207+
where the task is to invoke CLI-based operations in containers without the need to leave
208+
a TaskFlow context.
209+
210+
How does this decorator work?
211+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
212+
See the following examples on how the decorator works:
213+
214+
.. exampleinclude:: /../tests/system/cncf/kubernetes/example_kubernetes_cmd_decorator.py
215+
:language: python
216+
:start-after: [START howto_decorator_kubernetes_cmd]
217+
:end-before: [END howto_decorator_kubernetes_cmd]
218+
185219
Include error message in email alert
186220
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
187221

providers/cncf/kubernetes/provider.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ connection-types:
146146
task-decorators:
147147
- class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task
148148
name: kubernetes
149+
- class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_cmd_task
150+
name: kubernetes_cmd
149151

150152
config:
151153
local_kubernetes_executor:
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import warnings
20+
from collections.abc import Sequence
21+
from typing import TYPE_CHECKING, Callable
22+
23+
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS
24+
25+
if AIRFLOW_V_3_0_PLUS:
26+
from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, task_decorator_factory
27+
else:
28+
from airflow.decorators.base import ( # type: ignore[no-redef]
29+
DecoratedOperator,
30+
TaskDecorator,
31+
task_decorator_factory,
32+
)
33+
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
34+
from airflow.utils.context import context_merge
35+
from airflow.utils.operator_helpers import determine_kwargs
36+
37+
if TYPE_CHECKING:
38+
from airflow.utils.context import Context
39+
40+
41+
class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
42+
custom_operator_name = "@task.kubernetes_cmd"
43+
44+
template_fields: Sequence[str] = KubernetesPodOperator.template_fields
45+
overwrite_rtif_after_execution: bool = True
46+
47+
def __init__(self, *, python_callable: Callable, args_only: bool = False, **kwargs) -> None:
48+
self.args_only = args_only
49+
50+
cmds = kwargs.pop("cmds", None)
51+
arguments = kwargs.pop("arguments", None)
52+
53+
if cmds is not None or arguments is not None:
54+
warnings.warn(
55+
f"The `cmds` and `arguments` are unused in {self.custom_operator_name} decorator. "
56+
"You should return a list of commands or image entrypoint arguments with "
57+
"args_only=True from the python_callable.",
58+
UserWarning,
59+
stacklevel=3,
60+
)
61+
62+
# If the name was not provided, we generate operator name from the python_callable
63+
# we also instruct operator to add a random suffix to avoid collisions by default
64+
op_name = kwargs.pop("name", f"k8s-airflow-pod-{python_callable.__name__}")
65+
random_name_suffix = kwargs.pop("random_name_suffix", True)
66+
67+
super().__init__(
68+
python_callable=python_callable,
69+
name=op_name,
70+
random_name_suffix=random_name_suffix,
71+
cmds=None,
72+
arguments=None,
73+
**kwargs,
74+
)
75+
76+
def execute(self, context: Context):
77+
generated = self._generate_cmds(context)
78+
if self.args_only:
79+
self.cmds = []
80+
self.arguments = generated
81+
else:
82+
self.cmds = generated
83+
self.arguments = []
84+
context["ti"].render_templates() # type: ignore[attr-defined]
85+
return super().execute(context)
86+
87+
def _generate_cmds(self, context: Context) -> list[str]:
88+
context_merge(context, self.op_kwargs)
89+
kwargs = determine_kwargs(self.python_callable, self.op_args, context)
90+
generated_cmds = self.python_callable(*self.op_args, **kwargs)
91+
func_name = self.python_callable.__name__
92+
if not isinstance(generated_cmds, list):
93+
raise TypeError(
94+
f"Expected python_callable to return a list of strings, but got {type(generated_cmds)}"
95+
)
96+
if not all(isinstance(cmd, str) for cmd in generated_cmds):
97+
raise TypeError(f"Expected {func_name} to return a list of strings, but got {generated_cmds}")
98+
if not generated_cmds:
99+
raise ValueError(f"The {func_name} returned an empty list of commands")
100+
101+
return generated_cmds
102+
103+
104+
def kubernetes_cmd_task(
105+
python_callable: Callable | None = None,
106+
**kwargs,
107+
) -> TaskDecorator:
108+
"""
109+
Kubernetes cmd operator decorator.
110+
111+
This wraps a function which should return command to be executed
112+
in K8s using KubernetesPodOperator. The function should return a list of strings.
113+
If args_only is set to True, the function should return a list of arguments for
114+
container default command. Also accepts any argument that KubernetesPodOperator
115+
will via ``kwargs``. Can be reused in a single DAG.
116+
117+
:param python_callable: Function to decorate
118+
"""
119+
return task_decorator_factory(
120+
python_callable=python_callable,
121+
decorated_operator_class=_KubernetesCmdDecoratedOperator,
122+
**kwargs,
123+
)

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ def get_provider_info():
8585
{
8686
"class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task",
8787
"name": "kubernetes",
88-
}
88+
},
89+
{
90+
"class-name": "airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd.kubernetes_cmd_task",
91+
"name": "kubernetes_cmd",
92+
},
8993
],
9094
"config": {
9195
"local_kubernetes_executor": {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
from __future__ import annotations
19+
20+
from datetime import datetime
21+
22+
from airflow.sdk import DAG, task
23+
24+
with DAG(
25+
dag_id="example_kubernetes_cmd_decorator",
26+
schedule=None,
27+
start_date=datetime(2021, 1, 1),
28+
tags=["example", "cncf", "kubernetes"],
29+
catchup=False,
30+
) as dag:
31+
# [START howto_decorator_kubernetes_cmd]
32+
@task
33+
def foo() -> str:
34+
return "foo"
35+
36+
@task
37+
def bar() -> str:
38+
return "bar"
39+
40+
@task.kubernetes_cmd(
41+
image="bash:5.2",
42+
name="full_cmd",
43+
in_cluster=False,
44+
)
45+
def execute_in_k8s_pod_full_cmd(foo_result: str, bar_result: str) -> list[str]:
46+
return ["echo", "-e", f"With full cmd:\\t{foo_result}\\t{bar_result}"]
47+
48+
# The args_only parameter is used to indicate that the decorated function will
49+
# return a list of arguments to be passed as arguments to the container entrypoint:
50+
# in this case, the `bash` command
51+
@task.kubernetes_cmd(args_only=True, image="bash:5.2", in_cluster=False)
52+
def execute_in_k8s_pod_args_only(foo_result: str, bar_result: str) -> list[str]:
53+
return ["-c", f"echo -e 'With args only:\\t{foo_result}\\t{bar_result}'"]
54+
55+
# Templating can be used in the returned command and all other templated fields in
56+
# the decorator parameters.
57+
@task.kubernetes_cmd(image="bash:5.2", name="my-pod-{{ ti.task_id }}", in_cluster=False)
58+
def apply_templating(message: str) -> list[str]:
59+
full_message = "Templated task_id: {{ ti.task_id }}, dag_id: " + message
60+
return ["echo", full_message]
61+
62+
foo_result = foo()
63+
bar_result = bar()
64+
65+
full_cmd_instance = execute_in_k8s_pod_full_cmd(foo_result, bar_result)
66+
args_instance = execute_in_k8s_pod_args_only(foo_result, bar_result)
67+
68+
[full_cmd_instance, args_instance] >> apply_templating("{{ dag.dag_id }}")
69+
70+
# [END howto_decorator_kubernetes_cmd]
71+
72+
73+
from tests_common.test_utils.system_tests import get_test_run
74+
75+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
76+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)