Skip to content
This repository was archived by the owner on Mar 15, 2023. It is now read-only.

Commit dc9b413

Browse files
aoenwyndhblb
authored andcommitted
[AIRFLOW-3160] Load latest_dagruns asynchronously (apache#4005)
1 parent c5322f4 commit dc9b413

File tree

5 files changed

+96
-29
lines changed

5 files changed

+96
-29
lines changed

airflow/www/templates/airflow/dags.html

+23-9
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,11 @@ <h2>DAGs</h2>
118118

119119
<!-- Column 7: Last Run -->
120120
<td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
121-
{% if dag %}
122-
{% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
123-
{% if last_run and last_run.execution_date %}
124-
<a href="{{ url_for('airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
125-
{{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
126-
</a>
127-
<span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
128-
{% endif %}
129-
{% endif %}
121+
<div height="10" width="10" id='last-run-{{ dag.safe_dag_id }}' style="display: block;">
122+
<a></a>
123+
<img class="loading-last-run" width="15" src="{{ url_for("static", filename="loading.gif") }}">
124+
<span aria-hidden="true" id="statuses_info" title=" " class="glyphicon glyphicon-info-sign" style="display:none"></span>
125+
</div>
130126
</td>
131127

132128
<!-- Column 8: Dag Runs -->
@@ -318,6 +314,24 @@ <h2>DAGs</h2>
318314
}
319315
});
320316
});
317+
d3.json("{{ url_for('airflow.last_dagruns') }}", function(error, json) {
318+
for(var safe_dag_id in json) {
319+
dag_id = json[safe_dag_id].dag_id;
320+
last_run = json[safe_dag_id].last_run;
321+
g = d3.select('div#last-run-' + safe_dag_id)
322+
323+
g.selectAll('a')
324+
.attr("href", "{{ url_for('airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run)
325+
.text(last_run);
326+
327+
g.selectAll('span')
328+
.attr("data-original-title", "Start Date: " + last_run)
329+
.style('display', null);
330+
331+
g.selectAll(".loading-last-run").remove();
332+
}
333+
d3.selectAll(".loading-last-run").remove();
334+
});
321335
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
322336
for(var dag_id in json) {
323337
states = json[dag_id];

airflow/www/views.py

+20
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,26 @@ def task_stats(self, session=None):
645645
})
646646
return wwwutils.json_response(payload)
647647

648+
@expose('/last_dagruns')
649+
@login_required
650+
@provide_session
651+
def last_dagruns(self, session=None):
652+
DagRun = models.DagRun
653+
654+
dags_to_latest_runs = dict(session.query(
655+
DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
656+
.group_by(DagRun.dag_id).all())
657+
658+
payload = {}
659+
for dag in dagbag.dags.values():
660+
if dag.dag_id in dags_to_latest_runs and dags_to_latest_runs[dag.dag_id]:
661+
payload[dag.safe_dag_id] = {
662+
'dag_id': dag.dag_id,
663+
'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M")
664+
}
665+
666+
return wwwutils.json_response(payload)
667+
648668
@expose('/code')
649669
@login_required
650670
def code(self):

airflow/www_rbac/templates/airflow/dags.html

+23-9
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,11 @@ <h2>DAGs</h2>
119119

120120
<!-- Column 7: Last Run -->
121121
<td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
122-
{% if dag %}
123-
{% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
124-
{% if last_run and last_run.execution_date %}
125-
<a href="{{ url_for('Airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
126-
{{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
127-
</a>
128-
<span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
129-
{% endif %}
130-
{% endif %}
122+
<div height="10" width="10" id='last-run-{{ dag.safe_dag_id }}' style="display: block;">
123+
<a></a>
124+
<img class="loading-last-run" width="15" src="{{ url_for("static", filename="loading.gif") }}">
125+
<span aria-hidden="true" id="statuses_info" title=" " class="glyphicon glyphicon-info-sign" style="display:none"></span>
126+
</div>
131127
</td>
132128

133129
<!-- Column 8: Dag Runs -->
@@ -318,6 +314,24 @@ <h2>DAGs</h2>
318314
}
319315
});
320316
});
317+
d3.json("{{ url_for('Airflow.last_dagruns') }}", function(error, json) {
318+
for(var safe_dag_id in json) {
319+
dag_id = json[safe_dag_id].dag_id;
320+
last_run = json[safe_dag_id].last_run;
321+
g = d3.select('div#last-run-' + safe_dag_id)
322+
323+
g.selectAll('a')
324+
.attr("href", "{{ url_for('Airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run)
325+
.text(last_run);
326+
327+
g.selectAll('span')
328+
.attr("data-original-title", "Start Date: " + last_run)
329+
.style('display', null);
330+
331+
g.selectAll(".loading-last-run").remove();
332+
}
333+
d3.selectAll(".loading-last-run").remove();
334+
});
321335
d3.json("{{ url_for('Airflow.dag_stats') }}", function(error, json) {
322336
for(var dag_id in json) {
323337
states = json[dag_id];

airflow/www_rbac/views.py

+27
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,33 @@ def task_stats(self, session=None):
384384
})
385385
return wwwutils.json_response(payload)
386386

387+
@expose('/last_dagruns')
388+
@has_access
389+
@provide_session
390+
def last_dagruns(self, session=None):
391+
DagRun = models.DagRun
392+
393+
filter_dag_ids = appbuilder.sm.get_accessible_dag_ids()
394+
395+
if not filter_dag_ids:
396+
return
397+
398+
dags_to_latest_runs = dict(session.query(
399+
DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
400+
.group_by(DagRun.dag_id).all())
401+
402+
payload = {}
403+
for dag in dagbag.dags.values():
404+
dag_accessible = 'all_dags' in filter_dag_ids or dag.dag_id in filter_dag_ids
405+
if (dag_accessible and dag.dag_id in dags_to_latest_runs and
406+
dags_to_latest_runs[dag.dag_id]):
407+
payload[dag.safe_dag_id] = {
408+
'dag_id': dag.dag_id,
409+
'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M")
410+
}
411+
412+
return wwwutils.json_response(payload)
413+
387414
@expose('/code')
388415
@has_dag_access(can_dag_read=True)
389416
@has_access

tests/core.py

+3-11
Original file line numberDiff line numberDiff line change
@@ -1847,17 +1847,6 @@ def test_index(self):
18471847
self.assertIn("DAGs", resp_html)
18481848
self.assertIn("example_bash_operator", resp_html)
18491849

1850-
# The HTML should contain data for the last-run. A link to the specific run,
1851-
# and the text of the date.
1852-
url = "/admin/airflow/graph?" + urlencode({
1853-
"dag_id": self.dag_python.dag_id,
1854-
"execution_date": self.dagrun_python.execution_date,
1855-
}).replace("&", "&amp;")
1856-
self.assertIn(url, resp_html)
1857-
self.assertIn(
1858-
self.dagrun_python.execution_date.strftime("%Y-%m-%d %H:%M"),
1859-
resp_html)
1860-
18611850
def test_query(self):
18621851
response = self.app.get('/admin/queryview/')
18631852
self.assertIn("Ad Hoc Query", response.data.decode('utf-8'))
@@ -1941,6 +1930,9 @@ def test_dag_views(self):
19411930
response = self.app.get(
19421931
'/admin/airflow/task_stats')
19431932
self.assertIn("example_bash_operator", response.data.decode('utf-8'))
1933+
response = self.app.get(
1934+
'/admin/airflow/last_dagruns')
1935+
self.assertIn("example_python_operator", response.data.decode('utf-8'))
19441936
url = (
19451937
"/admin/airflow/success?task_id=print_the_context&"
19461938
"dag_id=example_python_operator&upstream=false&downstream=false&"

0 commit comments

Comments
 (0)