Skip to content

Commit fb64f2e

Browse files
kwlznaoen
authored andcommitted
[TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp.
1 parent 94a46f4 commit fb64f2e

File tree

6 files changed

+77
-15
lines changed

6 files changed

+77
-15
lines changed

.arcconfig

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"arc.feature.start.default": "origin/twtr_rb_1.10.0",
3+
"arc.land.onto.default": "twtr_rb_1.10.0",
4+
"base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt",
5+
"history.immutable": false
6+
}

README_TWITTER.md

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Developing locally
2+
3+
Here are some steps to develop this dependency locally and interact with source, interpreted from
4+
https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source
5+
6+
1. Create a git branch for this change.
7+
2. Edit `airflow/version.py` to change the version.
8+
3. Edit `source/3rdparty/python/BUILD` with the corresponding version.
9+
4. Run the command `python2.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel.
10+
It will be written to `airflow/dist`.
11+
5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`.
12+
6. Run `ps aux | grep pantsd` to find the pid of the pantsd process.
13+
7. Run `kill $pid` where `$pid` is the the pid just observed.
14+
8. From the `source` directory, run `./pants clean-all`.
15+
9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands
16+
without the option `--python-repos-repos`. You can either edit these to include this option,
17+
or run a pants command that includes it, which will cache the local artifact you need, e.g.
18+
`./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"`
19+
10. Now you can start up airflow instances as usual with the newly built wheel!
20+
11. See the above link for `Adding Dependencies to science-libraries`.

airflow/jobs.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -1317,6 +1317,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
13171317
# actually enqueue them
13181318
for task_instance in task_instances:
13191319
simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)
1320+
1321+
path = simple_dag.full_filepath
1322+
if path.startswith(settings.DAGS_FOLDER):
1323+
path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1)
1324+
13201325
command = " ".join(TI.generate_command(
13211326
task_instance.dag_id,
13221327
task_instance.task_id,
@@ -1328,7 +1333,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instanc
13281333
ignore_task_deps=False,
13291334
ignore_ti_state=False,
13301335
pool=task_instance.pool,
1331-
file_path=simple_dag.full_filepath,
1336+
file_path=path,
13321337
pickle_id=simple_dag.pickle_id))
13331338

13341339
priority = task_instance.priority_weight

airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py

+36-6
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,23 @@
3333

3434
from alembic import op
3535
from sqlalchemy.dialects import mysql
36+
from sqlalchemy import text
3637
import sqlalchemy as sa
3738

3839

3940
def upgrade():
4041
conn = op.get_bind()
4142
if conn.dialect.name == 'mysql':
4243
conn.execute("SET time_zone = '+00:00'")
43-
cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
44-
res = cur.fetchall()
45-
if res[0][0] == 0:
46-
raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")
44+
# @awilcox July 2018
45+
# we only need to worry about explicit_defaults_for_timestamp if we have
46+
# DATETIME columns that are NOT explicitly declared with NULL
47+
# ... and we don't, all are explicit
48+
49+
# cur = conn.execute("SELECT @@explicit_defaults_for_timestamp")
50+
# res = cur.fetchall()
51+
# if res[0][0] == 0:
52+
# raise Exception("Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql")
4753

4854
op.alter_column(table_name='chart', column_name='last_modified', type_=mysql.TIMESTAMP(fsp=6))
4955

@@ -53,7 +59,9 @@ def upgrade():
5359

5460
op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.TIMESTAMP(fsp=6))
5561

56-
op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6))
62+
# NOTE(kwilson): See below.
63+
op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
64+
nullable=False, server_default=text('CURRENT_TIMESTAMP(6)'))
5765
op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
5866
op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
5967

@@ -76,7 +84,29 @@ def upgrade():
7684
op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
7785
op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
7886

79-
op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False)
87+
# NOTE(kwilson)
88+
#
89+
# N.B. Here (and above) we explicitly set a default to the string literal `CURRENT_TIMESTAMP(6)` to avoid the
90+
# default MySQL behavior for TIMESTAMP without `explicit_defaults_for_timestamp` turned on as stated here:
91+
#
92+
# "The first TIMESTAMP column in a table, if not explicitly declared with the NULL attribute or an explicit
93+
# DEFAULT or ON UPDATE attribute, is automatically declared with the DEFAULT CURRENT_TIMESTAMP and
94+
# ON UPDATE CURRENT_TIMESTAMP attributes." [0]
95+
#
96+
# Because of the "ON UPDATE CURRENT_TIMESTAMP" default, anytime the `task_instance` table is UPDATE'd without
97+
# explicitly re-passing the current value for the `execution_date` column, it will end up getting clobbered with
98+
# the current timestamp value which breaks `dag_run` <-> `task_instance` alignment and causes all sorts of
99+
# scheduler and DB integrity breakage (because `execution_date` is part of the primary key).
100+
#
101+
# We unfortunately cannot turn `explicit_defaults_for_timestamp` on globally ourselves as is now technically
102+
# required by Airflow [1], because this has to be set in the my.cnf and we don't control that in managed MySQL.
103+
# A request to enable this fleet-wide has been made in MVP-18609.
104+
#
105+
# [0]: https://dev.mysql.com/doc/refman/5.6/en/server-system-variables.html#sysvar_explicit_defaults_for_timestamp
106+
# [1]: https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#mysql-setting-required
107+
108+
op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6),
109+
nullable=False, server_default=text('CURRENT_TIMESTAMP(6)'))
80110
op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6))
81111
op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6))
82112
op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6))

airflow/security/kerberos.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ def renew_from_kt():
6060
sys.exit(subp.returncode)
6161

6262
global NEED_KRB181_WORKAROUND
63-
if NEED_KRB181_WORKAROUND is None:
64-
NEED_KRB181_WORKAROUND = detect_conf_var()
65-
if NEED_KRB181_WORKAROUND:
66-
# (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
67-
# renew the ticket after the initial valid time.
68-
time.sleep(1.5)
69-
perform_krb181_workaround()
63+
# This breaks for twitter as we dont issue renewable tickets
64+
# if NEED_KRB181_WORKAROUND is None:
65+
# NEED_KRB181_WORKAROUND = detect_conf_var()
66+
# if NEED_KRB181_WORKAROUND:
67+
# # (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
68+
# # renew the ticket after the initial valid time.
69+
# time.sleep(1.5)
70+
# perform_krb181_workaround()
7071

7172

7273
def perform_krb181_workaround():

airflow/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
# under the License.
1919
#
2020

21-
version = '1.10.0'
21+
version = '1.10.0+twtr5'

0 commit comments

Comments
 (0)