Skip to content

Commit db40f8e

Browse files
committed
Fix deferrable mode for SparkKubernetesOperator
1 parent 8a4de86 commit db40f8e

File tree

2 files changed

+20
-7
lines changed

2 files changed

+20
-7
lines changed

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -796,11 +796,13 @@ def _refresh_cached_properties(self):
796796
del self.pod_manager
797797

798798
def execute_async(self, context: Context) -> None:
799-
self.pod_request_obj = self.build_pod_request_obj(context)
800-
self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`
801-
pod_request_obj=self.pod_request_obj,
802-
context=context,
803-
)
799+
if self.pod_request_obj is None:
800+
self.pod_request_obj = self.build_pod_request_obj(context)
801+
if self.pod is None:
802+
self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`
803+
pod_request_obj=self.pod_request_obj,
804+
context=context,
805+
)
804806
if self.callbacks:
805807
pod = self.find_pod(self.pod.metadata.namespace, context=context)
806808
for callback in self.callbacks:

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,19 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8
268268
def process_pod_deletion(self, pod, *, reraise=True):
269269
if pod is not None:
270270
if self.delete_on_termination:
271-
self.log.info("Deleting spark job: %s", pod.metadata.name.replace("-driver", ""))
272-
self.launcher.delete_spark_job(pod.metadata.name.replace("-driver", ""))
271+
pod_name = pod.metadata.name.replace("-driver", "")
272+
self.log.info("Deleting spark job: %s", pod_name)
273+
274+
# because of self.launcher is defined in execute method it does not exist for
275+
# deferrable mode and for this reason launcher should be defined one more time
276+
launcher = CustomObjectLauncher(
277+
name=pod_name,
278+
namespace=pod.metadata.namespace,
279+
kube_client=self.client,
280+
custom_obj_api=self.custom_obj_api,
281+
template_body=self.template_body,
282+
)
283+
launcher.delete_spark_job(pod_name)
273284
else:
274285
self.log.info("skipping deleting spark job: %s", pod.metadata.name)
275286

0 commit comments

Comments
 (0)