Skip to content

Commit 7f6d815

Browse files
committed
Merge branch 'main' into feature/add-dataset-event-post-endpoint
2 parents c0e6780 + c6ba13a commit 7f6d815

File tree

3 files changed

+27
-5
lines changed

3 files changed

+27
-5
lines changed

airflow/providers/cncf/kubernetes/operators/pod.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -793,9 +793,11 @@ def post_complete_action(self, *, pod, remote_pod, **kwargs):
793793
self.callbacks.on_pod_cleanup(pod=pod, client=self.client, mode=ExecutionMode.SYNC)
794794

795795
def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
796-
# If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up
796+
# Skip cleaning the pod in the following scenarios.
797+
# 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up
797798
# there. Cleaning it up again will raise an exception (which might cause retry).
798-
if self._killed:
799+
# 2. remote pod is null (ex: pod creation failed)
800+
if self._killed or not remote_pod:
799801
return
800802

801803
istio_enabled = self.is_istio_enabled(remote_pod)

kubernetes_tests/test_kubernetes_pod_operator.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,21 @@ def test_working_pod(self, mock_get_connection):
206206
assert self.expected_pod["spec"] == actual_pod["spec"]
207207
assert self.expected_pod["metadata"]["labels"] == actual_pod["metadata"]["labels"]
208208

209+
def test_skip_cleanup(self, mock_get_connection):
210+
k = KubernetesPodOperator(
211+
namespace="unknown",
212+
image="ubuntu:16.04",
213+
cmds=["bash", "-cx"],
214+
arguments=["echo 10"],
215+
labels=self.labels,
216+
task_id=str(uuid4()),
217+
in_cluster=False,
218+
do_xcom_push=False,
219+
)
220+
context = create_context(k)
221+
with pytest.raises(ApiException):
222+
k.execute(context)
223+
209224
def test_delete_operator_pod(self, mock_get_connection):
210225
k = KubernetesPodOperator(
211226
namespace="default",
@@ -1158,7 +1173,7 @@ def get_op():
11581173
# `create_pod` should be called because though there's still a pod to be found,
11591174
# it will be `already_checked`
11601175
with mock.patch(f"{POD_MANAGER_CLASS}.create_pod") as create_mock:
1161-
with pytest.raises(AirflowException):
1176+
with pytest.raises(Exception):
11621177
k.execute(context)
11631178
create_mock.assert_called_once()
11641179

tests/providers/cncf/kubernetes/operators/test_pod.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1229,16 +1229,21 @@ def test_previous_pods_ignored_for_reattached(self):
12291229
_, kwargs = k.client.list_namespaced_pod.call_args
12301230
assert "already_checked!=True" in kwargs["label_selector"]
12311231

1232+
@patch(KUB_OP_PATH.format("find_pod"))
12321233
@patch(f"{POD_MANAGER_CLASS}.delete_pod")
12331234
@patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
1234-
def test_mark_checked_unexpected_exception(self, mock_patch_already_checked, mock_delete_pod):
1235+
def test_mark_checked_unexpected_exception(
1236+
self, mock_patch_already_checked, mock_delete_pod, find_pod_mock
1237+
):
12351238
"""If we aren't deleting pods and have an exception, mark it so we don't reattach to it"""
12361239
k = KubernetesPodOperator(
12371240
task_id="task",
12381241
on_finish_action="keep_pod",
12391242
)
1243+
found_pods = [MagicMock(), MagicMock(), MagicMock()]
1244+
find_pod_mock.side_effect = [None] + found_pods
12401245
self.await_pod_mock.side_effect = AirflowException("oops")
1241-
context = create_context(k)
1246+
context = create_context(k, persist_to_db=True)
12421247
with pytest.raises(AirflowException):
12431248
k.execute(context=context)
12441249
mock_patch_already_checked.assert_called_once()

0 commit comments

Comments
 (0)