Skip to content

Commit fd9388c

Browse files
committed
Merge pull request #1290 from jlowin/subdag-backfill-status
Make sure backfill deadlocks raise errors
2 parents 58abca2 + b2844af commit fd9388c

14 files changed

+315
-192
lines changed

airflow/bin/cli.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ def process_subdir(subdir):
3939
def get_dag(args):
4040
dagbag = DagBag(process_subdir(args.subdir))
4141
if args.dag_id not in dagbag.dags:
42-
raise AirflowException('dag_id could not be found')
42+
raise AirflowException(
43+
'dag_id could not be found: {}'.format(args.dag_id))
4344
return dagbag.dags[args.dag_id]
4445

4546

airflow/configuration.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ def run_command(command):
356356
TEST_CONFIG = """\
357357
[core]
358358
airflow_home = {AIRFLOW_HOME}
359-
dags_folder = {AIRFLOW_HOME}/dags
359+
dags_folder = {TEST_DAGS_FOLDER}
360360
base_log_folder = {AIRFLOW_HOME}/logs
361361
executor = SequentialExecutor
362362
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
@@ -583,6 +583,17 @@ def mkdir_p(path):
583583
else:
584584
AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG'])
585585

586+
# Set up dags folder for unit tests
587+
# this directory won't exist if users install via pip
588+
_TEST_DAGS_FOLDER = os.path.join(
589+
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
590+
'tests',
591+
'dags')
592+
if os.path.exists(_TEST_DAGS_FOLDER):
593+
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
594+
else:
595+
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')
596+
586597

587598
def parameterized_config(template):
588599
"""

airflow/jobs.py

+99-68
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ def process_dag(self, dag, executor):
499499
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}
500500

501501
descartes = [obj for obj in product(dag.tasks, active_runs)]
502+
could_not_run = set()
502503
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
503504
'skippable ones'.format(len(descartes), len(skip_tis)))
504505
for task, dttm in descartes:
@@ -513,6 +514,23 @@ def process_dag(self, dag, executor):
513514
elif ti.is_runnable(flag_upstream_failed=True):
514515
self.logger.debug('Firing task: {}'.format(ti))
515516
executor.queue_task_instance(ti, pickle_id=pickle_id)
517+
else:
518+
could_not_run.add(ti)
519+
520+
# this type of deadlock happens when dagruns can't even start and so
521+
# the TI's haven't been persisted to the database.
522+
if len(could_not_run) == len(descartes):
523+
self.logger.error(
524+
'Dag runs are deadlocked for DAG: {}'.format(dag.dag_id))
525+
(session
526+
.query(models.DagRun)
527+
.filter(
528+
models.DagRun.dag_id == dag.dag_id,
529+
models.DagRun.state == State.RUNNING,
530+
models.DagRun.execution_date.in_(active_runs))
531+
.update(
532+
{models.DagRun.state: State.FAILED},
533+
synchronize_session='fetch'))
516534

517535
# Releasing the lock
518536
self.logger.debug("Unlocking DAG (scheduler_lock)")
@@ -553,8 +571,6 @@ def process_events(self, executor, dagbag):
553571
# collect queued tasks for prioritiztion
554572
if ti.state == State.QUEUED:
555573
self.queued_tis.add(ti)
556-
elif ti in self.queued_tis:
557-
self.queued_tis.remove(ti)
558574
else:
559575
# special instructions for failed executions could go here
560576
pass
@@ -583,6 +599,8 @@ def prioritize_queued(self, session, executor, dagbag):
583599
else:
584600
d[ti.pool].append(ti)
585601

602+
self.queued_tis.clear()
603+
586604
dag_blacklist = set(dagbag.paused_dags())
587605
for pool, tis in list(d.items()):
588606
if not pool:
@@ -781,11 +799,12 @@ def _execute(self):
781799

782800
# Build a list of all instances to run
783801
tasks_to_run = {}
784-
failed = []
785-
succeeded = []
786-
started = []
787-
wont_run = []
788-
not_ready_to_run = set()
802+
failed = set()
803+
succeeded = set()
804+
started = set()
805+
skipped = set()
806+
not_ready = set()
807+
deadlocked = set()
789808

790809
for task in self.dag.tasks:
791810
if (not self.include_adhoc) and task.adhoc:
@@ -800,67 +819,56 @@ def _execute(self):
800819
session.commit()
801820

802821
# Triggering what is ready to get triggered
803-
deadlocked = False
804822
while tasks_to_run and not deadlocked:
805-
823+
not_ready.clear()
806824
for key, ti in list(tasks_to_run.items()):
807825

808826
ti.refresh_from_db()
809827
ignore_depends_on_past = (
810828
self.ignore_first_depends_on_past and
811829
ti.execution_date == (start_date or ti.start_date))
812830

813-
# Did the task finish without failing? -- then we're done
814-
if (
815-
ti.state in (State.SUCCESS, State.SKIPPED) and
816-
key in tasks_to_run):
817-
succeeded.append(key)
818-
tasks_to_run.pop(key)
831+
# The task was already marked successful or skipped by a
832+
# different Job. Don't rerun it.
833+
if key not in started:
834+
if ti.state == State.SUCCESS:
835+
succeeded.add(key)
836+
tasks_to_run.pop(key)
837+
continue
838+
elif ti.state == State.SKIPPED:
839+
skipped.add(key)
840+
tasks_to_run.pop(key)
841+
continue
819842

820-
# Is the task runnable? -- the run it
821-
elif ti.is_queueable(
843+
# Is the task runnable? -- then run it
844+
if ti.is_queueable(
822845
include_queued=True,
823846
ignore_depends_on_past=ignore_depends_on_past,
824847
flag_upstream_failed=True):
848+
self.logger.debug('Sending {} to executor'.format(ti))
825849
executor.queue_task_instance(
826850
ti,
827851
mark_success=self.mark_success,
828852
pickle_id=pickle_id,
829853
ignore_dependencies=self.ignore_dependencies,
830854
ignore_depends_on_past=ignore_depends_on_past,
831855
pool=self.pool)
832-
ti.state = State.RUNNING
833-
if key not in started:
834-
started.append(key)
835-
if ti in not_ready_to_run:
836-
not_ready_to_run.remove(ti)
837-
838-
# Mark the task as not ready to run. If the set of tasks
839-
# that aren't ready ever equals the set of tasks to run,
840-
# then the backfill is deadlocked
856+
started.add(key)
857+
858+
# Mark the task as not ready to run
841859
elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
842-
not_ready_to_run.add(ti)
843-
if not_ready_to_run == set(tasks_to_run.values()):
844-
msg = 'BackfillJob is deadlocked: no tasks can be run.'
845-
if any(
846-
t.are_dependencies_met() !=
847-
t.are_dependencies_met(
848-
ignore_depends_on_past=True)
849-
for t in tasks_to_run.values()):
850-
msg += (
851-
' Some of the tasks that were unable to '
852-
'run have "depends_on_past=True". Try running '
853-
'the backfill with the option '
854-
'"ignore_first_depends_on_past=True" '
855-
' or passing "-I" at the command line.')
856-
self.logger.error(msg)
857-
deadlocked = True
858-
wont_run.extend(not_ready_to_run)
859-
tasks_to_run.clear()
860+
self.logger.debug('Added {} to not_ready'.format(ti))
861+
not_ready.add(key)
860862

861863
self.heartbeat()
862864
executor.heartbeat()
863865

866+
# If the set of tasks that aren't ready ever equals the set of
867+
# tasks to run, then the backfill is deadlocked
868+
if not_ready and not_ready == set(tasks_to_run):
869+
deadlocked.update(tasks_to_run.values())
870+
tasks_to_run.clear()
871+
864872
# Reacting to events
865873
for key, state in list(executor.get_event_buffer().items()):
866874
dag_id, task_id, execution_date = key
@@ -882,12 +890,12 @@ def _execute(self):
882890

883891
# task reports skipped
884892
elif ti.state == State.SKIPPED:
885-
wont_run.append(key)
893+
skipped.add(key)
886894
self.logger.error("Skipping {} ".format(key))
887895

888896
# anything else is a failure
889897
else:
890-
failed.append(key)
898+
failed.add(key)
891899
self.logger.error("Task instance {} failed".format(key))
892900

893901
tasks_to_run.pop(key)
@@ -899,18 +907,19 @@ def _execute(self):
899907
if ti.state == State.SUCCESS:
900908
self.logger.info(
901909
'Task instance {} succeeded'.format(key))
902-
succeeded.append(key)
910+
succeeded.add(key)
903911
tasks_to_run.pop(key)
904912

905913
# task reports failure
906914
elif ti.state == State.FAILED:
907915
self.logger.error("Task instance {} failed".format(key))
908-
failed.append(key)
916+
failed.add(key)
909917
tasks_to_run.pop(key)
910918

911919
# this probably won't ever be triggered
912-
elif key in not_ready_to_run:
913-
continue
920+
elif ti in not_ready:
921+
self.logger.info(
922+
"{} wasn't expected to run, but it did".format(ti))
914923

915924
# executor reports success but task does not - this is weird
916925
elif ti.state not in (
@@ -939,29 +948,51 @@ def _execute(self):
939948
ti.handle_failure(msg)
940949
tasks_to_run.pop(key)
941950

942-
msg = (
943-
"[backfill progress] "
944-
"waiting: {0} | "
945-
"succeeded: {1} | "
946-
"kicked_off: {2} | "
947-
"failed: {3} | "
948-
"wont_run: {4} ").format(
949-
len(tasks_to_run),
950-
len(succeeded),
951-
len(started),
952-
len(failed),
953-
len(wont_run))
951+
msg = ' | '.join([
952+
"[backfill progress]",
953+
"waiting: {0}",
954+
"succeeded: {1}",
955+
"kicked_off: {2}",
956+
"failed: {3}",
957+
"skipped: {4}",
958+
"deadlocked: {5}"
959+
]).format(
960+
len(tasks_to_run),
961+
len(succeeded),
962+
len(started),
963+
len(failed),
964+
len(skipped),
965+
len(deadlocked))
954966
self.logger.info(msg)
955967

956968
executor.end()
957969
session.close()
970+
971+
err = ''
958972
if failed:
959-
msg = (
960-
"------------------------------------------\n"
961-
"Some tasks instances failed, "
962-
"here's the list:\n{}".format(failed))
963-
raise AirflowException(msg)
964-
self.logger.info("All done. Exiting.")
973+
err += (
974+
"---------------------------------------------------\n"
975+
"Some task instances failed:\n{}\n".format(failed))
976+
if deadlocked:
977+
err += (
978+
'---------------------------------------------------\n'
979+
'BackfillJob is deadlocked.')
980+
deadlocked_depends_on_past = any(
981+
t.are_dependencies_met() != t.are_dependencies_met(
982+
ignore_depends_on_past=True)
983+
for t in deadlocked)
984+
if deadlocked_depends_on_past:
985+
err += (
986+
'Some of the deadlocked tasks were unable to run because '
987+
'of "depends_on_past" relationships. Try running the '
988+
'backfill with the option '
989+
'"ignore_first_depends_on_past=True" or passing "-I" at '
990+
'the command line.')
991+
err += ' These tasks were unable to run:\n{}\n'.format(deadlocked)
992+
if err:
993+
raise AirflowException(err)
994+
995+
self.logger.info("Backfill done. Exiting.")
965996

966997

967998
class LocalTaskJob(BaseJob):

airflow/models.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2398,7 +2398,8 @@ def get_active_runs(self):
23982398
# AND there are unfinished tasks...
23992399
any(ti.state in State.unfinished() for ti in task_instances) and
24002400
# AND none of them have dependencies met...
2401-
all(not ti.are_dependencies_met() for ti in task_instances
2401+
all(not ti.are_dependencies_met(session=session)
2402+
for ti in task_instances
24022403
if ti.state in State.unfinished()))
24032404

24042405
for run in active_runs:

airflow/utils/state.py

+18
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,26 @@ def runnable(cls):
6969
cls.QUEUED
7070
]
7171

72+
@classmethod
73+
def finished(cls):
74+
"""
75+
A list of states indicating that a task started and completed a
76+
run attempt. Note that the attempt could have resulted in failure or
77+
have been interrupted; in any case, it is no longer running.
78+
"""
79+
return [
80+
cls.SUCCESS,
81+
cls.SHUTDOWN,
82+
cls.FAILED,
83+
cls.SKIPPED,
84+
]
85+
7286
@classmethod
7387
def unfinished(cls):
88+
"""
89+
A list of states indicating that a task either has not completed
90+
a run or has not even started.
91+
"""
7492
return [
7593
cls.NONE,
7694
cls.QUEUED,

airflow/utils/tests.py

-22
This file was deleted.

scripts/ci/run_tests.sh

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ if [ "${TRAVIS}" ]; then
1616
echo "Using travis airflow.cfg"
1717
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
1818
cp -f ${DIR}/airflow_travis.cfg ~/airflow/unittests.cfg
19+
20+
ROOTDIR="$(dirname $(dirname $DIR))"
21+
export AIRFLOW__CORE__DAGS_FOLDER="$ROOTDIR/tests/dags"
1922
fi
2023

2124
echo Backend: $AIRFLOW__CORE__SQL_ALCHEMY_CONN

tests/core.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
4444
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
4545
TEST_DAG_ID = 'unit_tests'
46-
configuration.test_mode()
46+
4747

4848
try:
4949
import cPickle as pickle

tests/dags/README.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Unit Tests DAGs Folder
2+
3+
This folder contains DAGs for Airflow unit testing.
4+
5+
To access a DAG in this folder, use the following code inside a unit test. Note this only works when `test_mode` is on; otherwise the normal Airflow `DAGS_FOLDER` will take precedence.
6+
7+
```python
8+
dagbag = DagBag()
9+
dag = dagbag.get(dag_id)
10+
```

0 commit comments

Comments
 (0)