Skip to content

Commit bc9bd1a

Browse files
committed
Fix labels used to find queued KubeExecutor pods (apache#19904)
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a) (cherry picked from commit 9cea821)
1 parent ca63be9 commit bc9bd1a

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

airflow/executors/kubernetes_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
460460
pod_generator.make_safe_label_value(task.dag_id),
461461
pod_generator.make_safe_label_value(task.task_id),
462462
pod_generator.datetime_to_label_safe_datestring(task.execution_date),
463-
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
463+
pod_generator.make_safe_label_value(str(task.queued_by_job_id)),
464464
)
465465
# pylint: enable=protected-access
466466
kwargs = dict(label_selector=dict_string)

tests/executors/test_kubernetes_executor.py

+55
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,61 @@ def test_pending_pod_timeout_multi_namespace_mode(
670670
)
671671
mock_delete_pod.assert_called_once_with('foo90', 'anothernamespace')
672672

673+
def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_dummy_dag, session):
674+
"""If a pod isn't found for a TI, reset the state to scheduled"""
675+
mock_kube_client = mock.MagicMock()
676+
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])
677+
678+
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
679+
dag_run = dag_maker.create_dagrun()
680+
681+
ti = dag_run.task_instances[0]
682+
ti.state = State.QUEUED
683+
ti.queued_by_job_id = 1
684+
session.flush()
685+
686+
executor = self.kubernetes_executor
687+
executor.kube_client = mock_kube_client
688+
executor.clear_not_launched_queued_tasks(session=session)
689+
690+
ti.refresh_from_db()
691+
assert ti.state == State.SCHEDULED
692+
assert mock_kube_client.list_namespaced_pod.call_count == 2
693+
mock_kube_client.list_namespaced_pod.assert_any_call(
694+
"default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
695+
)
696+
# also check that we fall back to execution_date if we didn't find the pod with run_id
697+
execution_date_label = pod_generator.datetime_to_label_safe_datestring(ti.execution_date)
698+
mock_kube_client.list_namespaced_pod.assert_called_with(
699+
"default",
700+
label_selector=(
701+
f"dag_id=test_clear,task_id=task1,airflow-worker=1,execution_date={execution_date_label}"
702+
),
703+
)
704+
705+
def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_dag, session):
706+
"""Leave the state alone if a pod already exists"""
707+
mock_kube_client = mock.MagicMock()
708+
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=["something"])
709+
710+
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
711+
dag_run = dag_maker.create_dagrun()
712+
713+
ti = dag_run.task_instances[0]
714+
ti.state = State.QUEUED
715+
ti.queued_by_job_id = 1
716+
session.flush()
717+
718+
executor = self.kubernetes_executor
719+
executor.kube_client = mock_kube_client
720+
executor.clear_not_launched_queued_tasks(session=session)
721+
722+
ti.refresh_from_db()
723+
assert ti.state == State.QUEUED
724+
mock_kube_client.list_namespaced_pod.assert_called_once_with(
725+
"default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
726+
)
727+
673728

674729
class TestKubernetesJobWatcher(unittest.TestCase):
675730
def setUp(self):

0 commit comments

Comments
 (0)