Skip to content

Commit f757a54

Browse files
authored
[AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#7143)
1 parent 63aa3db commit f757a54

File tree

4 files changed

+20
-2
lines changed

4 files changed

+20
-2
lines changed

airflow/config_templates/config.yml

+8
Original file line numberDiff line numberDiff line change
@@ -1176,6 +1176,14 @@
11761176
type: string
11771177
example: ~
11781178
default: "prefork"
1179+
- name: operation_timeout
1180+
description: |
1181+
The number of seconds to wait before timing out ``send_task_to_executor`` or
1182+
``fetch_celery_task_state`` operations.
1183+
version_added: ~
1184+
type: int
1185+
example: ~
1186+
default: "2"
11791187
- name: celery_broker_transport_options
11801188
description: |
11811189
This section is for specifying options which can be passed to the

airflow/config_templates/default_airflow.cfg

+4
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ ssl_cacert =
563563
# https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html
564564
pool = prefork
565565

566+
# The number of seconds to wait before timing out ``send_task_to_executor`` or
567+
# ``fetch_celery_task_state`` operations.
568+
operation_timeout = 2
569+
566570
[celery_broker_transport_options]
567571

568572
# This section is for specifying options which can be passed to the

airflow/executors/celery_executor.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'
4343

44+
OPERATION_TIMEOUT = conf.getint('celery', 'operation_timeout', fallback=2)
45+
4446
'''
4547
To start the celery worker, run the command:
4648
airflow celery worker
@@ -102,7 +104,7 @@ def fetch_celery_task_state(celery_task: Tuple[TaskInstanceKeyType, AsyncResult]
102104
"""
103105

104106
try:
105-
with timeout(seconds=2):
107+
with timeout(seconds=OPERATION_TIMEOUT):
106108
# Accessing state property of celery task will make actual network request
107109
# to get the current state of the task.
108110
return celery_task[0], celery_task[1].state
@@ -122,7 +124,7 @@ def send_task_to_executor(task_tuple: TaskInstanceInCelery) \
122124
"""Sends task to executor."""
123125
key, _, command, queue, task_to_run = task_tuple
124126
try:
125-
with timeout(seconds=2):
127+
with timeout(seconds=OPERATION_TIMEOUT):
126128
result = task_to_run.apply_async(args=[command], queue=queue)
127129
except Exception as e: # pylint: disable=broad-except
128130
exception_traceback = "Celery Task ID: {}\n{}".format(key, traceback.format_exc())

tests/executors/test_celery_executor.py

+4
Original file line numberDiff line numberDiff line change
@@ -187,5 +187,9 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
187187
mock_stats_gauge.assert_has_calls(calls)
188188

189189

190+
def test_operation_timeout_config():
191+
assert celery_executor.OPERATION_TIMEOUT == 2
192+
193+
190194
if __name__ == '__main__':
191195
unittest.main()

0 commit comments

Comments
 (0)