Skip to content

Commit 299b4d8

Browse files
vshshjn7Vishesh Jain
and
Vishesh Jain
authored
[TWTR] CP from 1.10+twtr (#35)
* 99ee040: CP from 1.10+twtr * 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint) * 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (#21) * CP 51b1aee: Relax version requiremets (#24) * CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (#25) * CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (#26) * CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (#27) * CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (#31) * fixing models.py and jobs.py file fix after CP * fixing typo and version bump Co-authored-by: Vishesh Jain <visheshj@twitter.com>
1 parent 4ce8d4c commit 299b4d8

13 files changed

+160
-12
lines changed

airflow/config_templates/default_airflow.cfg

+7-2
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ sql_alchemy_reconnect_timeout = 300
126126
# SqlAlchemy supports databases with the concept of multiple schemas.
127127
sql_alchemy_schema =
128128

129+
# Import path for connect args in SqlAlchemy. Default to an empty dict.
130+
# This is useful when you want to configure db engine args that SqlAlchemy won't parse in connection string.
131+
# See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args
132+
# sql_alchemy_connect_args =
133+
129134
# The amount of parallelism as a setting to the executor. This defines
130135
# the max number of task instances that should run simultaneously
131136
# on this airflow installation
@@ -562,8 +567,8 @@ basedn = dc=example,dc=com
562567
cacert = /etc/ca/ldap_ca.crt
563568
search_scope = LEVEL
564569

565-
# This setting allows the use of LDAP servers that either return a
566-
# broken schema, or do not return a schema.
570+
# This setting allows the use of LDAP servers that either return a
571+
# broken schema, or do not return a schema.
567572
ignore_malformed_schema = False
568573

569574
[mesos]

airflow/executors/base_executor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None):
5757
key = simple_task_instance.key
5858
if key not in self.queued_tasks and key not in self.running:
5959
self.log.info("Adding to queue: %s", command)
60-
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
6160
else:
62-
self.log.info("could not queue task %s", key)
61+
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
62+
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
6363

6464
def queue_task_instance(
6565
self,

airflow/jobs/scheduler_job.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ def _run_file_processor(result_channel,
125125
stdout = StreamLogWriter(log, logging.INFO)
126126
stderr = StreamLogWriter(log, logging.WARN)
127127

128+
log.info("Setting log context for file {}".format(file_path))
129+
# log file created here
128130
set_context(log, file_path)
131+
log.info("Successfully set log context for file {}".format(file_path))
129132
setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))
130133

131134
try:
@@ -145,6 +148,7 @@ def _run_file_processor(result_channel,
145148
log.info("Started process (PID=%s) to work on %s",
146149
os.getpid(), file_path)
147150
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
151+
log.info("Processing file {}".format(file_path))
148152
result = scheduler_job.process_file(file_path, pickle_dags)
149153
result_channel.send(result)
150154
end_time = time.time()
@@ -167,6 +171,7 @@ def start(self):
167171
"""
168172
Launch the process and start processing the DAG.
169173
"""
174+
self.log.info("Launching process to process DAG at {}".format(self.file_path))
170175
self._parent_channel, _child_channel = multiprocessing.Pipe()
171176
self._process = multiprocessing.Process(
172177
target=type(self)._run_file_processor,
@@ -983,10 +988,9 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
983988

984989
if self.executor.has_task(task_instance):
985990
self.log.debug(
986-
"Not handling task %s as the executor reports it is running",
991+
"Still handling task %s even though as the executor reports it is running",
987992
task_instance.key
988993
)
989-
continue
990994
executable_tis.append(task_instance)
991995
open_slots -= 1
992996
dag_concurrency_map[dag_id] += 1
@@ -1405,8 +1409,17 @@ def _execute_helper(self):
14051409
State.UP_FOR_RESCHEDULE],
14061410
State.NONE)
14071411

1412+
scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids)
1413+
self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids))
1414+
1415+
# TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery
1416+
# Executor does not reliably enqueue tasks with the my MySQL broker, and we have
1417+
# seen tasks hang after they get queued. The effect of this hack is queued tasks
1418+
# will constantly be requeued and resent to the executor (Celery).
1419+
# This should be removed when we switch away from the MySQL Celery backend.
14081420
self._execute_task_instances(simple_dag_bag,
1409-
(State.SCHEDULED,))
1421+
(State.SCHEDULED, State.QUEUED))
1422+
14101423
except Exception as e:
14111424
self.log.error("Error queuing tasks")
14121425
self.log.exception(e)
@@ -1453,7 +1466,9 @@ def _execute_helper(self):
14531466
sleep(sleep_length)
14541467

14551468
# Stop any processors
1469+
self.log.info("Terminating DAG processors")
14561470
self.processor_agent.terminate()
1471+
self.log.info("All DAG processors terminated")
14571472

14581473
# Verify that all files were processed, and if so, deactivate DAGs that
14591474
# haven't been touched by the scheduler as they likely have been

airflow/models/baseoperator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def __init__(
349349
)
350350
self._schedule_interval = schedule_interval
351351
self.retries = retries if retries is not None else \
352-
configuration.conf.getint('core', 'default_task_retries', fallback=0)
352+
int(configuration.conf.get('core', 'default_task_retries', fallback=0))
353353
self.queue = queue
354354
self.pool = pool
355355
self.sla = sla

airflow/models/dag.py

+6
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,8 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
12891289
:return: None
12901290
"""
12911291

1292+
self.log.info("Attempting to sync DAG {} to DB".format(self._dag_id))
1293+
12921294
if owner is None:
12931295
owner = self.owner
12941296
if sync_time is None:
@@ -1312,8 +1314,12 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
13121314
session.merge(orm_dag)
13131315
session.commit()
13141316

1317+
self.log.info("Synced DAG %s to DB", self._dag_id)
1318+
13151319
for subdag in self.subdags:
1320+
self.log.info("Syncing SubDAG %s", subdag._dag_id)
13161321
subdag.sync_to_db(owner=owner, sync_time=sync_time, session=session)
1322+
self.log.info("Successfully synced SubDAG %s", subdag._dag_id)
13171323

13181324
@staticmethod
13191325
@provide_session

airflow/models/taskinstance.py

+1
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,7 @@ def signal_handler(signum, frame):
941941
self.refresh_from_db(lock_for_update=True)
942942
self.state = State.SUCCESS
943943
except AirflowSkipException as e:
944+
# This change is in reference to [AIRFLOW-5653][CX-16266]
944945
# log only if exception has any arguments to prevent log flooding
945946
if e.args:
946947
self.log.info(e)

airflow/settings.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
3737
from airflow.contrib.kubernetes.pod import Pod
3838
from airflow.logging_config import configure_logging
39+
from airflow.utils.module_loading import import_string
3940
from airflow.utils.sqlalchemy import setup_event_handlers
4041

4142
log = logging.getLogger(__name__)
@@ -222,7 +223,14 @@ def configure_orm(disable_connection_pool=False):
222223
# For Python2 we get back a newstr and need a str
223224
engine_args['encoding'] = engine_args['encoding'].__str__()
224225

225-
engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
226+
if conf.has_option('core', 'sql_alchemy_connect_args'):
227+
connect_args = import_string(
228+
conf.get('core', 'sql_alchemy_connect_args')
229+
)
230+
else:
231+
connect_args = {}
232+
233+
engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args)
226234
reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
227235
setup_event_handlers(engine, reconnect_timeout)
228236

airflow/utils/dag_processing.py

+5
Original file line numberDiff line numberDiff line change
@@ -1209,10 +1209,15 @@ def heartbeat(self):
12091209
processor.pid, file_path
12101210
)
12111211
self._processors[file_path] = processor
1212+
1213+
self.log.info("Number of active file processors: {}".format(len(self._processors)))
12121214

12131215
# Update heartbeat count.
12141216
self._run_count[self._heart_beat_key] += 1
12151217

1218+
simple_dag_ids = ", ".join([simple_dag.dag_id for simple_dag in simple_dags])
1219+
self.log.info("Processed DAGs: {}".format(simple_dag_ids))
1220+
12161221
return simple_dags
12171222

12181223
def _kill_timed_out_processors(self):

airflow/utils/log/file_processor_handler.py

+2
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,14 @@ def _init_file(self, filename):
138138

139139
if not os.path.exists(directory):
140140
try:
141+
logging.info("Creating directory {}".format(directory))
141142
os.makedirs(directory)
142143
except OSError:
143144
if not os.path.isdir(directory):
144145
raise
145146

146147
if not os.path.exists(full_path):
148+
logging.info("Creating file {}".format(full_path))
147149
open(full_path, "a").close()
148150

149151
return full_path

airflow/utils/log/file_task_handler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def set_context(self, ti):
5252
:param ti: task instance object
5353
"""
5454
local_loc = self._init_file(ti)
55-
self.handler = logging.FileHandler(local_loc)
55+
self.handler = logging.FileHandler(local_loc, encoding='utf-8')
5656
self.handler.setFormatter(self.formatter)
5757
self.handler.setLevel(self.level)
5858

airflow/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
# under the License.
1919
#
2020

21-
version = '1.10.4+twtr2'
21+
version = '1.10.4+twtr3'

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def do_setup():
339339
'flask-login>=0.3, <0.5',
340340
'flask-swagger==0.2.13',
341341
'flask-wtf>=0.14.2, <0.15',
342-
'funcsigs==1.0.0',
342+
'funcsigs==1.0.0, <2.0.0',
343343
'future>=0.16.0, <0.17',
344344
'gunicorn>=19.5.0, <20.0',
345345
'iso8601>=0.1.12',

tests/test_sqlalchemy_config.py

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
import unittest
21+
22+
from sqlalchemy.pool import NullPool
23+
24+
from airflow import settings
25+
from tests.compat import patch
26+
from tests.test_utils.config import conf_vars
27+
28+
SQL_ALCHEMY_CONNECT_ARGS = {
29+
'test': 43503,
30+
'dict': {
31+
'is': 1,
32+
'supported': 'too'
33+
}
34+
}
35+
36+
37+
class TestSqlAlchemySettings(unittest.TestCase):
38+
def setUp(self):
39+
self.old_engine = settings.engine
40+
self.old_session = settings.Session
41+
self.old_conn = settings.SQL_ALCHEMY_CONN
42+
settings.SQL_ALCHEMY_CONN = "mysql+foobar://user:pass@host/dbname?inline=param&another=param"
43+
44+
def tearDown(self):
45+
settings.engine = self.old_engine
46+
settings.Session = self.old_session
47+
settings.SQL_ALCHEMY_CONN = self.old_conn
48+
49+
@patch('airflow.settings.setup_event_handlers')
50+
@patch('airflow.settings.scoped_session')
51+
@patch('airflow.settings.sessionmaker')
52+
@patch('airflow.settings.create_engine')
53+
def test_configure_orm_with_default_values(self,
54+
mock_create_engine,
55+
mock_sessionmaker,
56+
mock_scoped_session,
57+
mock_setup_event_handlers):
58+
settings.configure_orm()
59+
mock_create_engine.assert_called_once_with(
60+
settings.SQL_ALCHEMY_CONN,
61+
connect_args={},
62+
encoding='utf-8',
63+
max_overflow=10,
64+
pool_pre_ping=True,
65+
pool_recycle=1800,
66+
pool_size=5
67+
)
68+
69+
@patch('airflow.settings.setup_event_handlers')
70+
@patch('airflow.settings.scoped_session')
71+
@patch('airflow.settings.sessionmaker')
72+
@patch('airflow.settings.create_engine')
73+
def test_sql_alchemy_connect_args(self,
74+
mock_create_engine,
75+
mock_sessionmaker,
76+
mock_scoped_session,
77+
mock_setup_event_handlers):
78+
config = {
79+
('core', 'sql_alchemy_connect_args'): 'tests.test_sqlalchemy_config.SQL_ALCHEMY_CONNECT_ARGS',
80+
('core', 'sql_alchemy_pool_enabled'): 'False'
81+
}
82+
with conf_vars(config):
83+
settings.configure_orm()
84+
mock_create_engine.assert_called_once_with(
85+
settings.SQL_ALCHEMY_CONN,
86+
connect_args=SQL_ALCHEMY_CONNECT_ARGS,
87+
poolclass=NullPool,
88+
encoding='utf-8'
89+
)
90+
91+
@patch('airflow.settings.setup_event_handlers')
92+
@patch('airflow.settings.scoped_session')
93+
@patch('airflow.settings.sessionmaker')
94+
@patch('airflow.settings.create_engine')
95+
def test_sql_alchemy_invalid_connect_args(self,
96+
mock_create_engine,
97+
mock_sessionmaker,
98+
mock_scoped_session,
99+
mock_setup_event_handlers):
100+
config = {
101+
('core', 'sql_alchemy_connect_args'): 'does.not.exist',
102+
('core', 'sql_alchemy_pool_enabled'): 'False'
103+
}
104+
with self.assertRaises(ImportError):
105+
with conf_vars(config):
106+
settings.configure_orm()

0 commit comments

Comments
 (0)