Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix occasional cleartask failures #18859

Merged
merged 1 commit into from
Oct 11, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions tests/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ def test_clear_task_instances(self, dag_maker):
ti1.run()

with create_session() as session:
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
clear_task_instances(qry, session, dag=dag)

ti0.refresh_from_db()
Expand Down Expand Up @@ -90,7 +94,11 @@ def test_clear_task_instances_external_executor_id(self, dag_maker):
session.add(ti0)
session.commit()

qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
clear_task_instances(qry, session, dag=dag)

ti0.refresh_from_db()
Expand Down Expand Up @@ -124,7 +132,11 @@ def test_clear_task_instances_dr_state(self, state, last_scheduling, dag_maker):
session = dag_maker.session
session.flush()

qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
clear_task_instances(qry, session, dag_run_state=state, dag=dag)
session.flush()

Expand Down Expand Up @@ -161,7 +173,11 @@ def test_clear_task_instances_without_task(self, dag_maker):
assert not dag.has_task(task1.task_id)

with create_session() as session:
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
clear_task_instances(qry, session)

# When dag is None, max_tries will be maximum of original max_tries or try_number.
Expand Down Expand Up @@ -195,7 +211,11 @@ def test_clear_task_instances_without_dag(self, dag_maker):
ti1.run()

with create_session() as session:
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
clear_task_instances(qry, session)

# When dag is None, max_tries will be maximum of original max_tries or try_number.
Expand Down Expand Up @@ -245,7 +265,16 @@ def count_task_reschedule(task_id):

assert count_task_reschedule(ti0.task_id) == 1
assert count_task_reschedule(ti1.task_id) == 1
qry = session.query(TI).filter(TI.dag_id == dag.dag_id, TI.task_id == ti0.task_id).all()
# we use order_by(task_id) here because for the test DAG structure of ours
# this is equivalent to topological sort. It would not work in general case
# but it works for our case because we specifically constructed test DAGS
# in the way that those two sort methods are equivalent
qry = (
session.query(TI)
.filter(TI.dag_id == dag.dag_id, TI.task_id == ti0.task_id)
.order_by(TI.task_id)
.all()
)
clear_task_instances(qry, session, dag=dag)
assert count_task_reschedule(ti0.task_id) == 0
assert count_task_reschedule(ti1.task_id) == 1
Expand Down