Skip to content

Commit 6f03529

Browse files
authored
Fix occasional cleartask failures (#18859)
The cleartask tests occasionally failed due to not consistent sequence in which task clearing was performed. The query did not have ordering and sometimes the tasks were returned in different order than expected.
1 parent 176165d commit 6f03529

File tree

1 file changed

+35
-6
lines changed

1 file changed

+35
-6
lines changed

tests/models/test_cleartasks.py

+35-6
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ def test_clear_task_instances(self, dag_maker):
6161
ti1.run()
6262

6363
with create_session() as session:
64-
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
64+
# we use order_by(task_id) here because for the test DAG structure of ours
65+
# this is equivalent to topological sort. It would not work in general case
66+
# but it works for our case because we specifically constructed test DAGS
67+
# in the way that those two sort methods are equivalent
68+
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
6569
clear_task_instances(qry, session, dag=dag)
6670

6771
ti0.refresh_from_db()
@@ -90,7 +94,11 @@ def test_clear_task_instances_external_executor_id(self, dag_maker):
9094
session.add(ti0)
9195
session.commit()
9296

93-
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
97+
# we use order_by(task_id) here because for the test DAG structure of ours
98+
# this is equivalent to topological sort. It would not work in general case
99+
# but it works for our case because we specifically constructed test DAGS
100+
# in the way that those two sort methods are equivalent
101+
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
94102
clear_task_instances(qry, session, dag=dag)
95103

96104
ti0.refresh_from_db()
@@ -124,7 +132,11 @@ def test_clear_task_instances_dr_state(self, state, last_scheduling, dag_maker):
124132
session = dag_maker.session
125133
session.flush()
126134

127-
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
135+
# we use order_by(task_id) here because for the test DAG structure of ours
136+
# this is equivalent to topological sort. It would not work in general case
137+
# but it works for our case because we specifically constructed test DAGS
138+
# in the way that those two sort methods are equivalent
139+
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
128140
clear_task_instances(qry, session, dag_run_state=state, dag=dag)
129141
session.flush()
130142

@@ -161,7 +173,11 @@ def test_clear_task_instances_without_task(self, dag_maker):
161173
assert not dag.has_task(task1.task_id)
162174

163175
with create_session() as session:
164-
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
176+
# we use order_by(task_id) here because for the test DAG structure of ours
177+
# this is equivalent to topological sort. It would not work in general case
178+
# but it works for our case because we specifically constructed test DAGS
179+
# in the way that those two sort methods are equivalent
180+
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
165181
clear_task_instances(qry, session)
166182

167183
# When dag is None, max_tries will be maximum of original max_tries or try_number.
@@ -195,7 +211,11 @@ def test_clear_task_instances_without_dag(self, dag_maker):
195211
ti1.run()
196212

197213
with create_session() as session:
198-
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
214+
# we use order_by(task_id) here because for the test DAG structure of ours
215+
# this is equivalent to topological sort. It would not work in general case
216+
# but it works for our case because we specifically constructed test DAGS
217+
# in the way that those two sort methods are equivalent
218+
qry = session.query(TI).filter(TI.dag_id == dag.dag_id).order_by(TI.task_id).all()
199219
clear_task_instances(qry, session)
200220

201221
# When dag is None, max_tries will be maximum of original max_tries or try_number.
@@ -245,7 +265,16 @@ def count_task_reschedule(task_id):
245265

246266
assert count_task_reschedule(ti0.task_id) == 1
247267
assert count_task_reschedule(ti1.task_id) == 1
248-
qry = session.query(TI).filter(TI.dag_id == dag.dag_id, TI.task_id == ti0.task_id).all()
268+
# we use order_by(task_id) here because for the test DAG structure of ours
269+
# this is equivalent to topological sort. It would not work in general case
270+
# but it works for our case because we specifically constructed test DAGS
271+
# in the way that those two sort methods are equivalent
272+
qry = (
273+
session.query(TI)
274+
.filter(TI.dag_id == dag.dag_id, TI.task_id == ti0.task_id)
275+
.order_by(TI.task_id)
276+
.all()
277+
)
249278
clear_task_instances(qry, session, dag=dag)
250279
assert count_task_reschedule(ti0.task_id) == 0
251280
assert count_task_reschedule(ti1.task_id) == 1

0 commit comments

Comments
 (0)