Skip to content

Commit b04f4c6

Browse files
committed
refactor: use render_templates from ti and overwrite_rtif_after_execution
1 parent 36dcd67 commit b04f4c6

File tree

2 files changed

+2
-22
lines changed

2 files changed

+2
-22
lines changed

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory
2424
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
25-
from airflow.providers.cncf.kubernetes.template_rendering import refresh_rendered_fields
2625
from airflow.utils.context import context_merge
2726
from airflow.utils.operator_helpers import determine_kwargs
2827

@@ -38,6 +37,7 @@ class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
3837
custom_operator_name = "@task.kubernetes_cmd"
3938

4039
template_fields: Sequence[str] = KubernetesPodOperator.template_fields
40+
overwrite_rtif_after_execution: bool = True
4141

4242
def __init__(self, *, python_callable: Callable, args_only: bool = False, **kwargs) -> None:
4343
self.args_only = args_only
@@ -76,7 +76,7 @@ def execute(self, context: Context):
7676
else:
7777
self.cmds = generated
7878
self.arguments = []
79-
refresh_rendered_fields(context["task_instance"])
79+
context["ti"].render_templates() # type: ignore[attr-defined]
8080
return super().execute(context)
8181

8282
def _generate_cmds(self, context: Context) -> list[str]:

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,3 @@ def get_rendered_k8s_spec(task_instance: TaskInstance, session: SASession = NEW_
7777
except (TemplateAssertionError, UndefinedError) as e:
7878
raise AirflowException(f"Unable to render a k8s spec for this taskinstance: {e}") from e
7979
return rendered_k8s_spec
80-
81-
82-
@provide_session
83-
def refresh_rendered_fields(task_instance: TaskInstance, session=NEW_SESSION) -> None:
84-
"""
85-
Rewrite the underlying rendered values for a task instance in the metadatabase.
86-
87-
TaskInstance.get_rendered_template_fields() cannot be used because this will retrieve the
88-
RenderedTaskInstanceFields from the metadatabase which doesn't have the runtime-evaluated
89-
command value in case of `@task.kubernetes_cmd` decorator.
90-
"""
91-
from airflow.models.renderedtifields import RenderedTaskInstanceFields
92-
93-
"""Update rendered task instance fields for cases where runtime evaluated, not templated."""
94-
95-
rtif = RenderedTaskInstanceFields(task_instance)
96-
RenderedTaskInstanceFields.write(rtif, session=session)
97-
RenderedTaskInstanceFields.delete_old_records(
98-
task_instance.task_id, task_instance.dag_id, session=session
99-
)

0 commit comments

Comments
 (0)