Skip to content

Commit 564e85e

Browse files
sjmiller609kaxil
authored andcommitted
[ASTRO-364] Handle timeout watching for tasks to finish (apache#64)
(cherry picked from commit e6debf3)
1 parent 84fa48f commit 564e85e

File tree

1 file changed

+47
-13
lines changed

1 file changed

+47
-13
lines changed

airflow/contrib/executors/kubernetes_executor.py

+47-13
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import kubernetes
3030
from kubernetes import watch, client
3131
from kubernetes.client.rest import ApiException
32-
from urllib3.exceptions import HTTPError
32+
from urllib3.exceptions import HTTPError, ReadTimeoutError
3333

3434
from airflow.configuration import conf
3535
from airflow.contrib.kubernetes.istio import Istio
@@ -320,6 +320,21 @@ def _validate(self):
320320
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
321321
"""Watches for Kubernetes jobs"""
322322
def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):
323+
"""Initialize KubernetesJobWatcher, a background process that informs the
324+
AirflowKubernetesScheduler when tasks are completed.
325+
326+
:param namespace: The namespace which will contain all tasks
327+
:type namespace: str
328+
:param watcher_queue: Used to inform the Scheduler of completed tasks
329+
:type watcher_queue: multiprocessing.Queue
330+
:param resource_version: A counter to indicate how many times a kubernetes resource changed
331+
:type resource_version: str, but looks like an int, for example "0"
332+
:param worker_uuid: A label selector used to locate pods that belong to this executor
333+
:type worker_uuid: str
334+
:param kube_config: Configuration for Kubernetes
335+
:type kube_config: KubeConfig
336+
337+
"""
323338
multiprocessing.Process.__init__(self)
324339
self.namespace = namespace
325340
self.worker_uuid = worker_uuid
@@ -339,8 +354,8 @@ def run(self):
339354
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
340355
raise
341356
else:
342-
self.log.warning('Watch died gracefully, starting back up with: '
343-
'last resource_version: %s', self.resource_version)
357+
self.log.info('Watcher will start back up with: '
358+
'last resource_version: %s', self.resource_version)
344359

345360
def _run(self, kube_client, resource_version, worker_uuid, kube_config):
346361
self.log.info(
@@ -350,15 +365,34 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
350365
watcher = watch.Watch()
351366

352367
kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}
368+
353369
if resource_version:
354370
kwargs['resource_version'] = resource_version
371+
355372
if kube_config.kube_client_request_args:
356373
for key, value in kube_config.kube_client_request_args.items():
357374
kwargs[key] = value
358375

359-
last_resource_version = None
360-
for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
361-
**kwargs):
376+
if resource_version:
377+
last_resource_version = resource_version
378+
else:
379+
last_resource_version = None
380+
381+
event_generator = watcher.stream(kube_client.list_namespaced_pod,
382+
self.namespace,
383+
**kwargs)
384+
while True:
385+
try:
386+
event = next(event_generator)
387+
except StopIteration:
388+
break
389+
except ReadTimeoutError:
390+
self.log.info("Timed out waiting for an event.")
391+
break
392+
except HTTPError:
393+
self.log.info("Terminating connection to kube-api.")
394+
break
395+
362396
task = event['object']
363397
self.log.info(
364398
'Event: %s had an event of type %s',
@@ -437,13 +471,11 @@ def _make_kube_watcher(self):
437471
watcher.start()
438472
return watcher
439473

440-
def _health_check_kube_watcher(self):
441-
if self.kube_watcher.is_alive():
442-
pass
443-
else:
474+
def _ensure_kube_watcher(self):
475+
if not self.kube_watcher.is_alive():
476+
settings.Stats.incr("executor.kube_watcher.restarts")
444477
self.log.error(
445-
'Error while health checking kube watcher process. '
446-
'Process died for unknown reasons')
478+
'Kubernetes job watcher process stopped. Restarting')
447479
self.kube_watcher = self._make_kube_watcher()
448480

449481
def run_next(self, next_job):
@@ -491,7 +523,7 @@ def sync(self):
491523
:return:
492524
493525
"""
494-
self._health_check_kube_watcher()
526+
self._ensure_kube_watcher()
495527
while True:
496528
try:
497529
task = self.watcher_queue.get_nowait()
@@ -500,6 +532,8 @@ def sync(self):
500532
finally:
501533
self.watcher_queue.task_done()
502534
except Empty:
535+
# When self.watcher_queue is empty,
536+
# this function returns
503537
break
504538

505539
def process_watcher_task(self, task):

0 commit comments

Comments
 (0)