Skip to content

Commit ce9f4ce

Browse files
committed
Fix labels used to find queued KubeExecutor pods (apache#19904)
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a)
1 parent 2a57361 commit ce9f4ce

File tree

2 files changed

+56
-2
lines changed

2 files changed

+56
-2
lines changed

airflow/executors/kubernetes_executor.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,6 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
463463
del self.last_handled[key]
464464

465465
for task in queued_tasks:
466-
467466
self.log.debug("Checking task %s", task)
468467

469468
# Check to see if we've handled it ourselves recently
@@ -474,7 +473,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
474473
dict_string = "dag_id={},task_id={},airflow-worker={}".format(
475474
pod_generator.make_safe_label_value(task.dag_id),
476475
pod_generator.make_safe_label_value(task.task_id),
477-
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
476+
pod_generator.make_safe_label_value(str(task.queued_by_job_id)),
478477
)
479478
kwargs = dict(label_selector=dict_string)
480479
if self.kube_config.kube_client_request_args:

tests/executors/test_kubernetes_executor.py

+55
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,61 @@ def test_pending_pod_timeout_multi_namespace_mode(
660660
)
661661
mock_delete_pod.assert_called_once_with('foo90', 'anothernamespace')
662662

663+
def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_dummy_dag, session):
664+
"""If a pod isn't found for a TI, reset the state to scheduled"""
665+
mock_kube_client = mock.MagicMock()
666+
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])
667+
668+
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
669+
dag_run = dag_maker.create_dagrun()
670+
671+
ti = dag_run.task_instances[0]
672+
ti.state = State.QUEUED
673+
ti.queued_by_job_id = 1
674+
session.flush()
675+
676+
executor = self.kubernetes_executor
677+
executor.kube_client = mock_kube_client
678+
executor.clear_not_launched_queued_tasks(session=session)
679+
680+
ti.refresh_from_db()
681+
assert ti.state == State.SCHEDULED
682+
assert mock_kube_client.list_namespaced_pod.call_count == 2
683+
mock_kube_client.list_namespaced_pod.assert_any_call(
684+
"default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
685+
)
686+
# also check that we fall back to execution_date if we didn't find the pod with run_id
687+
execution_date_label = pod_generator.datetime_to_label_safe_datestring(ti.execution_date)
688+
mock_kube_client.list_namespaced_pod.assert_called_with(
689+
"default",
690+
label_selector=(
691+
f"dag_id=test_clear,task_id=task1,airflow-worker=1,execution_date={execution_date_label}"
692+
),
693+
)
694+
695+
def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_dag, session):
696+
"""Leave the state alone if a pod already exists"""
697+
mock_kube_client = mock.MagicMock()
698+
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=["something"])
699+
700+
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
701+
dag_run = dag_maker.create_dagrun()
702+
703+
ti = dag_run.task_instances[0]
704+
ti.state = State.QUEUED
705+
ti.queued_by_job_id = 1
706+
session.flush()
707+
708+
executor = self.kubernetes_executor
709+
executor.kube_client = mock_kube_client
710+
executor.clear_not_launched_queued_tasks(session=session)
711+
712+
ti.refresh_from_db()
713+
assert ti.state == State.QUEUED
714+
mock_kube_client.list_namespaced_pod.assert_called_once_with(
715+
"default", label_selector="dag_id=test_clear,task_id=task1,airflow-worker=1,run_id=test"
716+
)
717+
663718

664719
class TestKubernetesJobWatcher(unittest.TestCase):
665720
def setUp(self):

0 commit comments

Comments
 (0)