Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EWT-7] 6607e48(airflow:master): [AIRFLOW-3160] Load latest_dagruns asynchronously, speed up front page load time apache#4005 #7

Merged
merged 5 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .arcconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"arc.feature.start.default": "origin/twtr_rb_1.10.0",
"arc.land.onto.default": "twtr_rb_1.10.0",
"base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt",
"history.immutable": false
}
20 changes: 20 additions & 0 deletions README_TWITTER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Developing locally

Here are some steps to develop this dependency locally and interact with source, interpreted from
https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source

1. Create a git branch for this change.
2. Edit `airflow/version.py` to change the version.
3. Edit `source/3rdparty/python/BUILD` with the corresponding version.
4. Run the command `python2.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel.
It will be written to `airflow/dist`.
5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`.
6. Run `ps aux | grep pantsd` to find the pid of the pantsd process.
7. Run `kill $pid` where `$pid` is the the pid just observed.
8. From the `source` directory, run `./pants clean-all`.
9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands
without the option `--python-repos-repos`. You can either edit these to include this option,
or run a pants command that includes it, which will cache the local artifact you need, e.g.
`./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"`
10. Now you can start up airflow instances as usual with the newly built wheel!
11. See the above link for `Adding Dependencies to science-libraries`.
7 changes: 6 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
# actually enqueue them
for simple_task_instance in simple_task_instances:
simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)

path = simple_dag.full_filepath
if path.startswith(settings.DAGS_FOLDER):
path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1)

command = TI.generate_command(
simple_task_instance.dag_id,
simple_task_instance.task_id,
Expand All @@ -1106,7 +1111,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
ignore_task_deps=False,
ignore_ti_state=False,
pool=simple_task_instance.pool,
file_path=simple_dag.full_filepath,
file_path=path,
pickle_id=simple_dag.pickle_id)

priority = simple_task_instance.priority_weight
Expand Down
227 changes: 106 additions & 121 deletions airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

from alembic import op
from sqlalchemy.dialects import mysql
from sqlalchemy import text
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "0e2a74e0fc9f"
down_revision = "d2ae31099d61"
Expand All @@ -38,128 +40,111 @@ def upgrade():
conn = op.get_bind()
if conn.dialect.name == "mysql":
conn.execute("SET time_zone = '+00:00'")
cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
res = cur.fetchall()
if res[0][0] == 0:
raise Exception(
"Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"
)

op.alter_column(
table_name="chart",
column_name="last_modified",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="dag",
column_name="last_scheduler_run",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="dag", column_name="last_pickled", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="dag", column_name="last_expired", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="dag_pickle",
column_name="created_dttm",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="dag_run",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="dag_run", column_name="start_date", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="dag_run", column_name="end_date", type_=mysql.TIMESTAMP(fsp=6)
)
# @awilcox July 2018
# we only need to worry about explicit_defaults_for_timestamp if we have
# DATETIME columns that are NOT explicitly declared with NULL
# ... and we don't, all are explicit

# cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
# res = cur.fetchall()
# if res[0][0] == 0:
# raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1)
# for mysql")

op.alter_column(table_name='chart', column_name='last_modified',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='dag', column_name='last_scheduler_run',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='dag_pickle', column_name='created_dttm',
type_=mysql.TIMESTAMP(fsp=6))

# NOTE(kwilson): See below.
op.alter_column(table_name='dag_run', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6), nullable=False,
server_default=text('CURRENT_TIMESTAMP(6)'))
op.alter_column(table_name='dag_run', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='import_error', column_name='timestamp',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='job', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='job', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='job', column_name='latest_heartbeat',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='known_event', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='known_event', column_name='end_date',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='log', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='sla_miss', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6), nullable=False)
op.alter_column(table_name='sla_miss', column_name='timestamp',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='task_fail', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_fail', column_name='end_date',
type_=mysql.TIMESTAMP(fsp=6))

# NOTE(kwilson)
#
# N.B. Here (and above) we explicitly set a default to the string literal
# `CURRENT_TIMESTAMP(6)` to avoid the
# default MySQL behavior for TIMESTAMP without `explicit_defaults_for_timestamp` turned
# on as stated here:
#
# "The first TIMESTAMP column in a table, if not explicitly declared with the NULL
# attribute or an explicit
# DEFAULT or ON UPDATE attribute, is automatically declared with the DEFAULT
# CURRENT_TIMESTAMP and
# ON UPDATE CURRENT_TIMESTAMP attributes." [0]
#
# Because of the "ON UPDATE CURRENT_TIMESTAMP" default, anytime the `task_instance` table
# is UPDATE'd without
# explicitly re-passing the current value for the `execution_date` column, it will end up
# getting clobbered with
# the current timestamp value which breaks `dag_run` <-> `task_instance` alignment and
# causes all sorts of
# scheduler and DB integrity breakage (because `execution_date` is part of the primary key).
#
# We unfortunately cannot turn `explicit_defaults_for_timestamp` on globally ourselves as
# is now technically
# required by Airflow [1], because this has to be set in the my.cnf and we don't control
# that in managed MySQL.
# A request to enable this fleet-wide has been made in MVP-18609.
#
# [0]: https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html
# #sysvar_explicit_defaults_for_timestamp
# [1]: https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#mysql-setting
# -required

op.alter_column(table_name='task_instance', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6), nullable=False,
server_default=text('CURRENT_TIMESTAMP(6)'))
op.alter_column(table_name='task_instance', column_name='start_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='end_date',
type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='task_instance', column_name='queued_dttm',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6))
op.alter_column(table_name='xcom', column_name='execution_date',
type_=mysql.TIMESTAMP(fsp=6))

op.alter_column(
table_name="import_error",
column_name="timestamp",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="job", column_name="start_date", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="job", column_name="end_date", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="job",
column_name="latest_heartbeat",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="log", column_name="dttm", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="log", column_name="execution_date", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="sla_miss",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
nullable=False,
)
op.alter_column(
table_name="sla_miss", column_name="timestamp", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="task_fail",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_fail",
column_name="start_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_fail", column_name="end_date", type_=mysql.TIMESTAMP(fsp=6)
)

op.alter_column(
table_name="task_instance",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
nullable=False,
)
op.alter_column(
table_name="task_instance",
column_name="start_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_instance",
column_name="end_date",
type_=mysql.TIMESTAMP(fsp=6),
)
op.alter_column(
table_name="task_instance",
column_name="queued_dttm",
type_=mysql.TIMESTAMP(fsp=6),
)

op.alter_column(
table_name="xcom", column_name="timestamp", type_=mysql.TIMESTAMP(fsp=6)
)
op.alter_column(
table_name="xcom",
column_name="execution_date",
type_=mysql.TIMESTAMP(fsp=6),
)
else:
# sqlite and mssql datetime are fine as is. Therefore, not converting
if conn.dialect.name in ("sqlite", "mssql"):
Expand Down
15 changes: 8 additions & 7 deletions airflow/security/kerberos.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ def renew_from_kt(principal, keytab):
sys.exit(subp.returncode)

global NEED_KRB181_WORKAROUND
if NEED_KRB181_WORKAROUND is None:
NEED_KRB181_WORKAROUND = detect_conf_var()
if NEED_KRB181_WORKAROUND:
# (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
# renew the ticket after the initial valid time.
time.sleep(1.5)
perform_krb181_workaround(principal)
# This breaks for twitter as we dont issue renewable tickets
# if NEED_KRB181_WORKAROUND is None:
# NEED_KRB181_WORKAROUND = detect_conf_var()
# if NEED_KRB181_WORKAROUND:
# # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
# # renew the ticket after the initial valid time.
# time.sleep(1.5)
# perform_krb181_workaround()


def perform_krb181_workaround(principal):
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.4'
version = '1.10.4+twtr'
30 changes: 23 additions & 7 deletions airflow/www/templates/airflow/dags.html
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,11 @@ <h2>DAGs</h2>

<!-- Column 7: Last Run -->
<td class="text-nowrap latest_dag_run {{ dag.dag_id }}">
{% set last_run = dag.get_last_dagrun(include_externally_triggered=True) %}
{% if last_run and last_run.execution_date %}
<a href="{{ url_for('airflow.graph', dag_id=dag.dag_id, execution_date=last_run.execution_date) }}">
{{ last_run.execution_date.strftime("%Y-%m-%d %H:%M") }}
</a>
<span aria-hidden="true" id="statuses_info" title="Start Date: {{ last_run.start_date.strftime("%Y-%m-%d %H:%M") }}" class="glyphicon glyphicon-info-sign"></span>
{% endif %}
<div height="10" width="10" id='last-run-{{ dag.safe_dag_id }}' style="display: block;">
<a></a>
<img class="loading-last-run" width="15" src="{{ url_for("static", filename="loading.gif") }}">
<span aria-hidden="true" id="statuses_info" title=" " class="glyphicon glyphicon-info-sign" style="display:none"></span>
</div>
</td>

<!-- Column 8: Dag Runs -->
Expand Down Expand Up @@ -309,6 +307,24 @@ <h2>DAGs</h2>
}
});
});
d3.json("{{ url_for('airflow.last_dagruns') }}", function(error, json) {
for(var safe_dag_id in json) {
dag_id = json[safe_dag_id].dag_id;
last_run = json[safe_dag_id].last_run;
g = d3.select('div#last-run-' + safe_dag_id)

g.selectAll('a')
.attr("href", "{{ url_for('airflow.graph') }}?dag_id=" + dag_id + "&execution_date=" + last_run)
.text(last_run);

g.selectAll('span')
.attr("data-original-title", "Start Date: " + last_run)
.style('display', null);

g.selectAll(".loading-last-run").remove();
}
d3.selectAll(".loading-last-run").remove();
});
d3.json("{{ url_for('airflow.dag_stats') }}", function(error, json) {
for(var dag_id in json) {
states = json[dag_id];
Expand Down
20 changes: 20 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,26 @@ def task_stats(self, session=None):
})
return wwwutils.json_response(payload)

@expose('/last_dagruns')
@login_required
@provide_session
def last_dagruns(self, session=None):
DagRun = models.DagRun

dags_to_latest_runs = dict(session.query(
DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.group_by(DagRun.dag_id).all())

payload = {}
for dag in dagbag.dags.values():
if dag.dag_id in dags_to_latest_runs and dags_to_latest_runs[dag.dag_id]:
payload[dag.safe_dag_id] = {
'dag_id': dag.dag_id,
'last_run': dags_to_latest_runs[dag.dag_id].strftime("%Y-%m-%d %H:%M")
}

return wwwutils.json_response(payload)

@expose('/code')
@login_required
@provide_session
Expand Down
Loading