@@ -244,7 +244,7 @@ def test_rendering_kubernetes_cmd(
244
244
op_arg : str ,
245
245
expected_command : list [str ],
246
246
):
247
- """Test that templating works in function arguments """
247
+ """Test that templating works in function return value """
248
248
with self .dag :
249
249
250
250
@task .kubernetes_cmd (
@@ -273,6 +273,66 @@ def hello(add_to_command: str):
273
273
assert containers [0 ].command == expected_command
274
274
assert containers [0 ].args == []
275
275
276
+ def test_basic_context_works (self ):
277
+ """Test that decorator works with context as kwargs unpcacked in function arguments"""
278
+ with self .dag :
279
+
280
+ @task .kubernetes_cmd (
281
+ image = "python:3.10-slim-buster" ,
282
+ in_cluster = False ,
283
+ cluster_context = "default" ,
284
+ config_file = "/tmp/fake_file" ,
285
+ namespace = "default" ,
286
+ )
287
+ def hello (** context ):
288
+ return ["echo" , context ["ti" ].task_id , context ["dag_run" ].dag_id ]
289
+
290
+ hello_task = hello ()
291
+
292
+ self .execute_task (hello_task )
293
+
294
+ self .mock_hook .assert_called_once_with (
295
+ conn_id = "kubernetes_default" ,
296
+ in_cluster = False ,
297
+ cluster_context = "default" ,
298
+ config_file = "/tmp/fake_file" ,
299
+ )
300
+ containers = self .mock_create_pod .call_args .kwargs ["pod" ].spec .containers
301
+ assert len (containers ) == 1
302
+
303
+ assert containers [0 ].command == ["echo" , "hello" , DAG_ID ]
304
+ assert containers [0 ].args == []
305
+
306
+ def test_named_context_variables (self ):
307
+ """Test that decorator works with specific context variable as kwargs in function arguments"""
308
+ with self .dag :
309
+
310
+ @task .kubernetes_cmd (
311
+ image = "python:3.10-slim-buster" ,
312
+ in_cluster = False ,
313
+ cluster_context = "default" ,
314
+ config_file = "/tmp/fake_file" ,
315
+ namespace = "default" ,
316
+ )
317
+ def hello (ti = None , dag_run = None ):
318
+ return ["echo" , ti .task_id , dag_run .dag_id ]
319
+
320
+ hello_task = hello ()
321
+
322
+ self .execute_task (hello_task )
323
+
324
+ self .mock_hook .assert_called_once_with (
325
+ conn_id = "kubernetes_default" ,
326
+ in_cluster = False ,
327
+ cluster_context = "default" ,
328
+ config_file = "/tmp/fake_file" ,
329
+ )
330
+ containers = self .mock_create_pod .call_args .kwargs ["pod" ].spec .containers
331
+ assert len (containers ) == 1
332
+
333
+ assert containers [0 ].command == ["echo" , "hello" , DAG_ID ]
334
+ assert containers [0 ].args == []
335
+
276
336
def test_rendering_kubernetes_cmd_decorator_params (self ):
277
337
"""Test that templating works in decorator parameters"""
278
338
with self .dag :
0 commit comments