@@ -687,6 +687,61 @@ def test_pending_pod_timeout_multi_namespace_mode(
687
687
)
688
688
mock_delete_pod .assert_called_once_with ('foo90' , 'anothernamespace' )
689
689
690
+ def test_clear_not_launched_queued_tasks_not_launched (self , dag_maker , create_dummy_dag , session ):
691
+ """If a pod isn't found for a TI, reset the state to scheduled"""
692
+ mock_kube_client = mock .MagicMock ()
693
+ mock_kube_client .list_namespaced_pod .return_value = k8s .V1PodList (items = [])
694
+
695
+ create_dummy_dag (dag_id = "test_clear" , task_id = "task1" , with_dagrun_type = None )
696
+ dag_run = dag_maker .create_dagrun ()
697
+
698
+ ti = dag_run .task_instances [0 ]
699
+ ti .state = State .QUEUED
700
+ ti .queued_by_job_id = 1
701
+ session .flush ()
702
+
703
+ executor = self .kubernetes_executor
704
+ executor .kube_client = mock_kube_client
705
+ executor .clear_not_launched_queued_tasks (session = session )
706
+
707
+ ti .refresh_from_db ()
708
+ assert ti .state == State .SCHEDULED
709
+ assert mock_kube_client .list_namespaced_pod .call_count == 2
710
+ mock_kube_client .list_namespaced_pod .assert_any_call (
711
+ "default" , label_selector = "dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
712
+ )
713
+ # also check that we fall back to execution_date if we didn't find the pod with run_id
714
+ execution_date_label = pod_generator .datetime_to_label_safe_datestring (ti .execution_date )
715
+ mock_kube_client .list_namespaced_pod .assert_called_with (
716
+ "default" ,
717
+ label_selector = (
718
+ f"dag_id=test_clear,task_id=task1,airflow-worker=1,execution_date={ execution_date_label } "
719
+ ),
720
+ )
721
+
722
+ def test_clear_not_launched_queued_tasks_launched (self , dag_maker , create_dummy_dag , session ):
723
+ """Leave the state alone if a pod already exists"""
724
+ mock_kube_client = mock .MagicMock ()
725
+ mock_kube_client .list_namespaced_pod .return_value = k8s .V1PodList (items = ["something" ])
726
+
727
+ create_dummy_dag (dag_id = "test_clear" , task_id = "task1" , with_dagrun_type = None )
728
+ dag_run = dag_maker .create_dagrun ()
729
+
730
+ ti = dag_run .task_instances [0 ]
731
+ ti .state = State .QUEUED
732
+ ti .queued_by_job_id = 1
733
+ session .flush ()
734
+
735
+ executor = self .kubernetes_executor
736
+ executor .kube_client = mock_kube_client
737
+ executor .clear_not_launched_queued_tasks (session = session )
738
+
739
+ ti .refresh_from_db ()
740
+ assert ti .state == State .QUEUED
741
+ mock_kube_client .list_namespaced_pod .assert_called_once_with (
742
+ "default" , label_selector = "dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
743
+ )
744
+
690
745
691
746
class TestKubernetesJobWatcher (unittest .TestCase ):
692
747
def setUp (self ):
0 commit comments