Skip to content

Commit 2102122

Browse files
authored
Handle IntegrityError while creating TIs (apache#10136)
While doing a trigger_dag from UI, DagRun gets created first and then WebServer starts creating TIs. Meanwhile, Scheduler also picks up the DagRun and starts creating the TIs, which results in IntegrityError as the Primary key constraint gets violated. This happens when a DAG has a good number of tasks. Also, changing the TIs array with a set for faster lookups for Dags with too many tasks.
1 parent d2540e6 commit 2102122

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

airflow/models/dagrun.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from sqlalchemy import (
2222
Boolean, Column, DateTime, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_,
2323
)
24+
from sqlalchemy.exc import IntegrityError
2425
from sqlalchemy.ext.declarative import declared_attr
2526
from sqlalchemy.orm import synonym
2627
from sqlalchemy.orm.session import Session
@@ -439,10 +440,10 @@ def verify_integrity(self, session=None):
439440
tis = self.get_task_instances(session=session)
440441

441442
# check for removed or restored tasks
442-
task_ids = []
443+
task_ids = set()
443444
for ti in tis:
444445
task_instance_mutation_hook(ti)
445-
task_ids.append(ti.task_id)
446+
task_ids.add(ti.task_id)
446447
task = None
447448
try:
448449
task = dag.get_task(ti.task_id)
@@ -477,7 +478,14 @@ def verify_integrity(self, session=None):
477478
task_instance_mutation_hook(ti)
478479
session.add(ti)
479480

480-
session.commit()
481+
try:
482+
session.commit()
483+
except IntegrityError as err:
484+
self.log.info(str(err))
485+
self.log.info('Hit IntegrityError while creating the TIs for '
486+
f'{dag.dag_id} - {self.execution_date}.')
487+
self.log.info('Doing session rollback.')
488+
session.rollback()
481489

482490
@staticmethod
483491
def get_run(session, dag_id, execution_date):

tests/models/test_dagrun.py

+19
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,25 @@ def with_all_tasks_removed(dag):
565565
flaky_ti.refresh_from_db()
566566
self.assertEqual(State.NONE, flaky_ti.state)
567567

568+
def test_already_added_task_instances_can_be_ignored(self):
569+
dag = DAG('triggered_dag', start_date=DEFAULT_DATE)
570+
dag.add_task(DummyOperator(task_id='first_task', owner='test'))
571+
572+
dagrun = self.create_dag_run(dag)
573+
first_ti = dagrun.get_task_instances()[0]
574+
self.assertEqual('first_task', first_ti.task_id)
575+
self.assertEqual(State.NONE, first_ti.state)
576+
577+
# Lets assume that the above TI was added into DB by webserver, but if scheduler
578+
# is running the same method at the same time it would find 0 TIs for this dag
579+
# and proceeds further to create TIs. Hence mocking DagRun.get_task_instances
580+
# method to return an empty list of TIs.
581+
with mock.patch.object(DagRun, 'get_task_instances') as mock_gtis:
582+
mock_gtis.return_value = []
583+
dagrun.verify_integrity()
584+
first_ti.refresh_from_db()
585+
self.assertEqual(State.NONE, first_ti.state)
586+
568587
@parameterized.expand([(state,) for state in State.task_states])
569588
@mock.patch('airflow.models.dagrun.task_instance_mutation_hook')
570589
def test_task_instance_mutation_hook(self, state, mock_hook):

0 commit comments

Comments
 (0)