Skip to content

Commit a93d550

Browse files
authored
[TWTR][[AIRFLOW-4939]] Add Default Retries and fix a small DAG refresh bug (twitter-forks#3)
1 parent 3136368 commit a93d550

File tree

3 files changed

+11
-3
lines changed

3 files changed

+11
-3
lines changed

airflow/config_templates/default_airflow.cfg

+3
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ killed_task_cleanup_time = 60
173173
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
174174
dag_run_conf_overrides_params = False
175175

176+
# The number of retries each task is going to have by default. Can be overridden at dag or task level.
177+
default_task_retries = 0
178+
176179
[cli]
177180
# In what way should the cli access the API. The LocalClient will use the
178181
# database directly, while the json_client will use the api running on the

airflow/config_templates/default_test.cfg

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ enable_xcom_pickling = False
5151
killed_task_cleanup_time = 5
5252
secure_mode = False
5353
hostname_callable = socket:getfqdn
54+
default_task_retries = 0
5455

5556
[cli]
5657
api_client = airflow.api.client.local_client

airflow/models.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,12 @@ def get_dag(self, dag_id):
297297
dag.last_loaded < orm_dag.last_expired
298298
)
299299
):
300+
300301
# Reprocess source file
302+
# TODO: remove the below hack to find relative dag location in webserver
303+
filepath = dag.fileloc if dag else orm_dag.fileloc
301304
found_dags = self.process_file(
302-
filepath=orm_dag.fileloc, only_if_updated=False)
305+
filepath=filepath, only_if_updated=False)
303306

304307
# If the source file no longer exports `dag_id`, delete it from self.dags
305308
if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
@@ -2342,7 +2345,7 @@ def __init__(
23422345
email=None,
23432346
email_on_retry=True,
23442347
email_on_failure=True,
2345-
retries=0,
2348+
retries=None,
23462349
retry_delay=timedelta(seconds=300),
23472350
retry_exponential_backoff=False,
23482351
max_retry_delay=None,
@@ -2416,7 +2419,8 @@ def __init__(
24162419
self
24172420
)
24182421
self._schedule_interval = schedule_interval
2419-
self.retries = retries
2422+
self.retries = retries if retries is not None else \
2423+
configuration.conf.getint('core', 'default_task_retries', fallback=0)
24202424
self.queue = queue
24212425
self.pool = pool
24222426
self.sla = sla

0 commit comments

Comments
 (0)