Skip to content

Commit b4f7161

Browse files
ephraimbuddyCloud Composer Team
authored and
Cloud Composer Team
committed
Send SLA callback to processor when DagRun has completed (#20683)
* Send SLA callback to processor when DagRun has completed Currently, sla callbacks are sent every time a dagrun is examined. This causes sla callbacks to be run too often and cause processors to timeout at times. Also deleted dags are not recreated when there are many slas. This PR addresses this by sending SLA callbacks to processor when a dagrun completes GitOrigin-RevId: 905baf9fa5402ccc062536915fd1911d812f625b
1 parent 5130fd6 commit b4f7161

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

airflow/jobs/scheduler_job.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,7 @@ def _schedule_dag_run(
10711071

10721072
# Send SLA & DAG Success/Failure Callbacks to be executed
10731073
self._send_dag_callbacks_to_processor(dag, callback_to_execute)
1074+
self._send_sla_callbacks_to_processor(dag)
10741075
# Because we send the callback here, we need to return None
10751076
return callback
10761077

@@ -1086,7 +1087,8 @@ def _schedule_dag_run(
10861087
# Work out if we should allow creating a new DagRun now?
10871088
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
10881089
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
1089-
1090+
# Send SLA Callbacks to be executed
1091+
self._send_sla_callbacks_to_processor(dag)
10901092
# This will do one query per dag run. We "could" build up a complex
10911093
# query to update all the TIs across all the execution dates and dag
10921094
# IDs in a single query, but it turns out that can be _very very slow_
@@ -1115,7 +1117,6 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallb
11151117
if not self.processor_agent:
11161118
raise ValueError("Processor agent is not started.")
11171119

1118-
self._send_sla_callbacks_to_processor(dag)
11191120
if callback:
11201121
self.processor_agent.send_callback_to_execute(callback)
11211122

tests/jobs/test_scheduler_job.py

+43
Original file line numberDiff line numberDiff line change
@@ -2574,6 +2574,49 @@ def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, dag_maker):
25742574
full_filepath=dag.fileloc, dag_id=dag_id
25752575
)
25762576

2577+
def test_sla_sent_to_processor_when_dagrun_completes(self, dag_maker, session):
2578+
"""Test that SLA is sent to the processor when the dagrun completes"""
2579+
with dag_maker() as dag:
2580+
DummyOperator(task_id='task', sla=timedelta(hours=1))
2581+
self.scheduler_job = SchedulerJob(subdir=os.devnull)
2582+
self.scheduler_job.executor = MockExecutor(do_update=False)
2583+
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
2584+
mock_sla_callback = mock.MagicMock()
2585+
self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
2586+
assert session.query(DagRun).count() == 0
2587+
dag_models = DagModel.dags_needing_dagruns(session).all()
2588+
self.scheduler_job._create_dag_runs(dag_models, session)
2589+
dr = session.query(DagRun).one()
2590+
dr.state = DagRunState.SUCCESS
2591+
ti = dr.get_task_instance('task', session)
2592+
ti.state = TaskInstanceState.SUCCESS
2593+
session.merge(ti)
2594+
session.merge(dr)
2595+
session.flush()
2596+
self.scheduler_job._schedule_dag_run(dr, session)
2597+
dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
2598+
self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
2599+
2600+
def test_sla_sent_to_processor_when_dagrun_timeout(self, dag_maker, session):
2601+
"""Test that SLA is sent to the processor when the dagrun timeout"""
2602+
with dag_maker(dagrun_timeout=datetime.timedelta(seconds=60)) as dag:
2603+
DummyOperator(task_id='task', sla=timedelta(hours=1))
2604+
self.scheduler_job = SchedulerJob(subdir=os.devnull)
2605+
self.scheduler_job.executor = MockExecutor(do_update=False)
2606+
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
2607+
mock_sla_callback = mock.MagicMock()
2608+
self.scheduler_job._send_sla_callbacks_to_processor = mock_sla_callback
2609+
assert session.query(DagRun).count() == 0
2610+
dag_models = DagModel.dags_needing_dagruns(session).all()
2611+
self.scheduler_job._create_dag_runs(dag_models, session)
2612+
dr = session.query(DagRun).one()
2613+
dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
2614+
session.merge(dr)
2615+
session.flush()
2616+
self.scheduler_job._schedule_dag_run(dr, session)
2617+
dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
2618+
self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
2619+
25772620
def test_create_dag_runs(self, dag_maker):
25782621
"""
25792622
Test various invariants of _create_dag_runs.

0 commit comments

Comments
 (0)