Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Default Retries and fix a small DAG refresh bug #3

Merged
merged 1 commit into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ killed_task_cleanup_time = 60
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False

# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enable_xcom_pickling = False
killed_task_cleanup_time = 5
secure_mode = False
hostname_callable = socket:getfqdn
default_task_retries = 0

[cli]
api_client = airflow.api.client.local_client
Expand Down
10 changes: 7 additions & 3 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,12 @@ def get_dag(self, dag_id):
dag.last_loaded < orm_dag.last_expired
)
):

# Reprocess source file
# TODO: remove the below hack to find relative dag location in webserver
filepath = dag.fileloc if dag else orm_dag.fileloc
found_dags = self.process_file(
filepath=orm_dag.fileloc, only_if_updated=False)
filepath=filepath, only_if_updated=False)

# If the source file no longer exports `dag_id`, delete it from self.dags
if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
Expand Down Expand Up @@ -2342,7 +2345,7 @@ def __init__(
email=None,
email_on_retry=True,
email_on_failure=True,
retries=0,
retries=None,
retry_delay=timedelta(seconds=300),
retry_exponential_backoff=False,
max_retry_delay=None,
Expand Down Expand Up @@ -2416,7 +2419,8 @@ def __init__(
self
)
self._schedule_interval = schedule_interval
self.retries = retries
self.retries = retries if retries is not None else \
configuration.conf.getint('core', 'default_task_retries', fallback=0)
self.queue = queue
self.pool = pool
self.sla = sla
Expand Down