-
Notifications
You must be signed in to change notification settings - Fork 14.8k
/
Copy pathmodels.py
1836 lines (1620 loc) · 63 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import copy
from datetime import datetime, timedelta
import getpass
import imp
import jinja2
import logging
import os
import dill
import re
import signal
import socket
import sys
from sqlalchemy import (
Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
Index,)
from sqlalchemy import func, or_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import relationship
from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
from airflow.configuration import conf
from airflow import settings
from airflow import utils
from airflow.utils import State
from airflow.utils import apply_defaults, provide_session
Base = declarative_base()
ID_LEN = 250
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
if 'mysql' in SQL_ALCHEMY_CONN:
LongText = LONGTEXT
else:
LongText = Text
def clear_task_instances(tis, session):
'''
Clears a set of task instances, but makes sure the running ones
get killed.
'''
job_ids = []
for ti in tis:
if ti.state == State.RUNNING:
if ti.job_id:
ti.state = State.SHUTDOWN
job_ids.append(ti.job_id)
else:
session.delete(ti)
if job_ids:
from airflow.jobs import BaseJob as BJ # HA!
for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all():
job.state = State.SHUTDOWN
class DagBag(object):
"""
A dagbag is a collection of dags, parsed out of a folder tree and has high
level configuration settings, like what database to use as a backend and
what executor to use to fire off tasks. This makes it easier to run
distinct environments for say production and development, tests, or for
different teams or security profiles. What would have been system level
settings are now dagbag level so that one system can run multiple,
independent settings sets.
:param dag_folder: the folder to scan to find DAGs
:type dag_folder: str
:param executor: the executor to use when executing task instances
in this DagBag
:param include_examples: whether to include the examples that ship
with airflow or not
:type include_examples: bool
:param sync_to_db: whether to sync the properties of the DAGs to
the metadata DB while finding them, typically should be done
by the scheduler job only
:type sync_to_db: bool
"""
def __init__(
self,
dag_folder=None,
executor=DEFAULT_EXECUTOR,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
sync_to_db=False):
dag_folder = dag_folder or DAGS_FOLDER
logging.info("Filling up the DagBag from " + dag_folder)
self.dag_folder = dag_folder
self.dags = {}
self.sync_to_db = sync_to_db
self.file_last_changed = {}
self.executor = executor
self.collect_dags(dag_folder)
if include_examples:
example_dag_folder = os.path.join(
os.path.dirname(__file__),
'example_dags')
self.collect_dags(example_dag_folder)
if sync_to_db:
self.deactivate_inactive_dags()
def get_dag(self, dag_id):
"""
Gets the DAG out of the dictionary, and refreshes it if expired
"""
if dag_id in self.dags:
dag = self.dags[dag_id]
if dag.is_subdag:
orm_dag = DagModel.get_current(dag.parent_dag.dag_id)
else:
orm_dag = DagModel.get_current(dag_id)
if orm_dag and dag.last_loaded < (
orm_dag.last_expired or datetime(2100, 1, 1)):
self.process_file(
filepath=orm_dag.fileloc, only_if_updated=False)
dag = self.dags[dag_id]
else:
orm_dag = DagModel.get_current(dag_id)
self.process_file(
filepath=orm_dag.fileloc, only_if_updated=False)
if dag_id in self.dags:
dag = self.dags[dag_id]
else:
dag = None
return dag
def process_file(self, filepath, only_if_updated=True, safe_mode=True):
"""
Given a path to a python module, this method imports the module and
look for dag objects within it.
"""
try:
# This failed before in what may have been a git sync
# race condition
dttm = datetime.fromtimestamp(os.path.getmtime(filepath))
except:
dttm = datetime(2001, 1, 1)
mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
mod_name = 'unusual_prefix_' + mod_name
if safe_mode:
# Skip file if no obvious references to airflow or DAG are found.
f = open(filepath, 'r')
content = f.read()
f.close()
if not all([s in content for s in ('DAG', 'airflow')]):
return
if (
not only_if_updated or
filepath not in self.file_last_changed or
dttm != self.file_last_changed[filepath]):
try:
logging.info("Importing " + filepath)
if mod_name in sys.modules:
del sys.modules[mod_name]
m = imp.load_source(mod_name, filepath)
except:
logging.error("Failed to import: " + filepath)
logging.exception("")
self.file_last_changed[filepath] = dttm
return
for dag in m.__dict__.values():
if isinstance(dag, DAG):
dag.full_filepath = filepath
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
# dag.pickle()
self.file_last_changed[filepath] = dttm
def bag_dag(self, dag, parent_dag, root_dag):
"""
Adds the DAG into the bag, recurses into sub dags.
"""
self.dags[dag.dag_id] = dag
dag.resolve_template_files()
dag.last_loaded = datetime.now()
if self.sync_to_db:
session = settings.Session()
orm_dag = session.query(
DagModel).filter(DagModel.dag_id == dag.dag_id).first()
if not orm_dag:
orm_dag = DagModel(dag_id=dag.dag_id)
orm_dag.fileloc = root_dag.full_filepath
orm_dag.is_subdag = dag.is_subdag
orm_dag.owners = root_dag.owner
orm_dag.is_active = True
session.merge(orm_dag)
session.commit()
session.close()
for subdag in dag.subdags:
subdag.full_filepath = dag.full_filepath
subdag.parent_dag = dag
subdag.fileloc = root_dag.full_filepath
subdag.is_subdag = True
self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
logging.info('Loaded DAG {dag}'.format(**locals()))
def collect_dags(
self,
dag_folder=DAGS_FOLDER,
only_if_updated=True):
"""
Given a file path or a folder, this file looks for python modules,
imports them and adds them to the dagbag collection.
Note that if a .airflowignore file is found while processing,
the directory, it will behaves much like a .gitignore does,
ignoring files that match any of the regex patterns specified
in the file.
"""
if os.path.isfile(dag_folder):
self.process_file(dag_folder, only_if_updated=only_if_updated)
elif os.path.isdir(dag_folder):
patterns = []
for root, dirs, files in os.walk(dag_folder):
ignore_file = [f for f in files if f == '.airflowignore']
if ignore_file:
f = open(os.path.join(root, ignore_file[0]), 'r')
patterns += [p for p in f.read().split('\n') if p]
f.close()
for f in files:
try:
filepath = os.path.join(root, f)
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
if not any([re.findall(p, filepath) for p in patterns]):
self.process_file(
filepath, only_if_updated=only_if_updated)
except:
pass
def deactivate_inactive_dags(self):
active_dag_ids = [dag.dag_id for dag in self.dags.values()]
session = settings.Session()
for dag in session.query(
DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all():
dag.is_active = False
session.merge(dag)
session.commit()
session.close()
def paused_dags(self):
session = settings.Session()
dag_ids = [dp.dag_id for dp in session.query(DagModel).filter(
DagModel.is_paused == True)]
session.commit()
session.close()
return dag_ids
class BaseUser(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True)
username = Column(String(ID_LEN), unique=True)
email = Column(String(500))
def __repr__(self):
return self.username
def get_id(self):
return unicode(self.id)
class Connection(Base):
"""
Placeholder to store information about different database instances
connection information. The idea here is that scripts use references to
database instances (conn_id) instead of hard coding hostname, logins and
passwords when using operators or hooks.
"""
__tablename__ = "connection"
id = Column(Integer(), primary_key=True)
conn_id = Column(String(ID_LEN))
conn_type = Column(String(500))
host = Column(String(500))
schema = Column(String(500))
login = Column(String(500))
password = Column(String(500))
port = Column(Integer())
extra = Column(String(5000))
def __init__(
self, conn_id=None, conn_type=None,
host=None, login=None, password=None,
schema=None, port=None):
self.conn_id = conn_id
self.conn_type = conn_type
self.host = host
self.login = login
self.password = password
self.schema = schema
self.port = port
def get_hook(self):
from airflow import hooks
try:
if self.conn_type == 'mysql':
return hooks.MySqlHook(mysql_conn_id=self.conn_id)
elif self.conn_type == 'postgres':
return hooks.PostgresHook(postgres_conn_id=self.conn_id)
elif self.conn_type == 'hive_cli':
return hooks.HiveCliHook(hive_cli_conn_id=self.conn_id)
elif self.conn_type == 'presto':
return hooks.PrestoHook(presto_conn_id=self.conn_id)
elif self.conn_type == 'hiveserver2':
return hooks.HiveServer2Hook(hiveserver2_conn_id=self.conn_id)
elif self.conn_type == 'sqlite':
return hooks.SqliteHook(sqlite_conn_id=self.conn_id)
except:
return None
def __repr__(self):
return self.conn_id
class DagPickle(Base):
"""
Dags can originate from different places (user repos, master repo, ...)
and also get executed in different places (different executors). This
object represents a version of a DAG and becomes a source of truth for
a BackfillJob execution. A pickle is a native python serialized object,
and in this case gets stored in the database for the duration of the job.
The executors pick up the DagPickle id and read the dag definition from
the database.
"""
id = Column(Integer, primary_key=True)
pickle = Column(PickleType(pickler=dill))
created_dttm = Column(DateTime, default=func.now())
pickle_hash = Column(Integer)
__tablename__ = "dag_pickle"
def __init__(self, dag):
self.dag_id = dag.dag_id
if hasattr(dag, 'template_env'):
dag.template_env = None
self.pickle_hash = hash(dag)
self.pickle = dag
class TaskInstance(Base):
"""
Task instances store the state of a task instance. This table is the
authority and single source of truth around what tasks have run and the
state they are in.
The SqlAchemy model doesn't have a SqlAlchemy foreign key to the task or
dag model deliberately to have more control over transactions.
Database transactions on this table should insure double triggers and
any confusion around what task instances are or aren't ready to run
even while multiple schedulers may be firing task instances.
"""
__tablename__ = "task_instance"
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
start_date = Column(DateTime)
end_date = Column(DateTime)
duration = Column(Integer)
state = Column(String(20))
try_number = Column(Integer)
hostname = Column(String(1000))
unixname = Column(String(1000))
job_id = Column(Integer)
pool = Column(String(50))
queue = Column(String(50))
priority_weight = Column(Integer)
__table_args__ = (
Index('ti_dag_state', dag_id, state),
Index('ti_state_lkp', dag_id, task_id, execution_date, state),
Index('ti_pool', pool, state, priority_weight),
)
def __init__(self, task, execution_date, state=None, job=None):
self.dag_id = task.dag_id
self.task_id = task.task_id
self.execution_date = execution_date
self.state = state
self.task = task
self.queue = task.queue
self.pool = task.pool
self.priority_weight = task.priority_weight_total
self.try_number = 1
self.unixname = getpass.getuser()
if job:
self.job_id = job.id
def command(
self,
mark_success=False,
ignore_dependencies=False,
force=False,
local=False,
pickle_id=None,
raw=False,
task_start_date=None,
job_id=None):
"""
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
"""
iso = self.execution_date.isoformat()
mark_success = "--mark_success" if mark_success else ""
pickle = "--pickle {0}".format(pickle_id) if pickle_id else ""
job_id = "--job_id {0}".format(job_id) if job_id else ""
ignore_dependencies = "-i" if ignore_dependencies else ""
force = "--force" if force else ""
local = "--local" if local else ""
task_start_date = \
"-s " + task_start_date.isoformat() if task_start_date else ""
raw = "--raw" if raw else ""
subdir = ""
if not pickle and self.task.dag and self.task.dag.full_filepath:
subdir = "-sd DAGS_FOLDER/{0}".format(self.task.dag.filepath)
return (
"airflow run "
"{self.dag_id} {self.task_id} {iso} "
"{mark_success} "
"{pickle} "
"{local} "
"{ignore_dependencies} "
"{force} "
"{job_id} "
"{raw} "
"{subdir} "
"{task_start_date} "
).format(**locals())
@property
def log_filepath(self):
iso = self.execution_date.isoformat()
log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
return (
"{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))
@property
def log_url(self):
iso = self.execution_date.isoformat()
BASE_URL = conf.get('webserver', 'BASE_URL')
return BASE_URL + (
"/admin/airflow/log"
"?dag_id={self.dag_id}"
"&task_id={self.task_id}"
"&execution_date={iso}"
).format(**locals())
def current_state(self, main_session=None):
"""
Get the very latest state from the database, if a session is passed,
we use and looking up the state becomes part of the session, otherwise
a new session is used.
"""
session = main_session or settings.Session()
TI = TaskInstance
ti = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
TI.execution_date == self.execution_date,
).all()
if ti:
state = ti[0].state
else:
state = None
if not main_session:
session.commit()
session.close()
return state
def error(self, main_session=None):
"""
Forces the task instance's state to FAILED in the database.
"""
session = settings.Session()
logging.error("Recording the task instance as FAILED")
self.state = State.FAILED
session.merge(self)
session.commit()
session.close()
def refresh_from_db(self, main_session=None):
"""
Refreshes the task instance from the database based on the primary key
"""
session = main_session or settings.Session()
TI = TaskInstance
ti = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
TI.execution_date == self.execution_date,
).first()
if ti:
self.state = ti.state
self.start_date = ti.start_date
self.end_date = ti.end_date
self.try_number = ti.try_number
if not main_session:
session.commit()
session.close()
@property
def key(self):
"""
Returns a tuple that identifies the task instance uniquely
"""
return (self.dag_id, self.task_id, self.execution_date)
def is_queueable(self):
"""
Returns a boolean on whether the task instance has met all dependencies
and is ready to run. It considers the task's state, the state
of its dependencies, depends_on_past and makes sure the execution
isn't in the future. It doesn't take into
account whether the pool has a slot for it to run.
"""
if self.execution_date > datetime.now() - self.task.schedule_interval:
return False
elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
return False
elif self.task.end_date and self.execution_date > self.task.end_date:
return False
elif (
self.state in State.runnable() and
self.are_dependencies_met()):
return True
else:
return False
def is_runnable(self):
"""
Returns whether a task is ready to run AND there's room in the
queue.
"""
return self.is_queueable() and not self.pool_full()
def are_dependents_done(self, main_session=None):
"""
Checks whether the dependents of this task instance have all succeeded.
This is meant to be used by wait_for_downstream.
This is useful when you do not want to start processing the next
schedule of a task until the dependents are done. For instance,
if the task DROPs and recreates a table.
"""
session = main_session or settings.Session()
task = self.task
if not task._downstream_list:
return True
downstream_task_ids = [t.task_id for t in task._downstream_list]
ti = session.query(func.count(TaskInstance.task_id)).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id.in_(downstream_task_ids),
TaskInstance.execution_date == self.execution_date,
TaskInstance.state == State.SUCCESS,
)
count = ti[0][0]
if not main_session:
session.commit()
session.close()
return count == len(task._downstream_list)
def are_dependencies_met(self, main_session=None):
"""
Returns a boolean on whether the upstream tasks are in a SUCCESS state
and considers depends_on_past and the previous run's state.
"""
TI = TaskInstance
# Using the session if passed as param
session = main_session or settings.Session()
task = self.task
# Checking that the depends_on_past is fulfilled
if (task.depends_on_past and
not self.execution_date == task.start_date):
previous_ti = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == task.task_id,
TI.execution_date ==
self.execution_date-task.schedule_interval,
TI.state == State.SUCCESS,
).first()
if not previous_ti:
return False
# Applying wait_for_downstream
previous_ti.task = self.task
if task.wait_for_downstream and not \
previous_ti.are_dependents_done(session):
return False
# Checking that all upstream dependencies have succeeded
if task._upstream_list:
upstream_task_ids = [t.task_id for t in task._upstream_list]
ti = session.query(func.count(TI.task_id)).filter(
TI.dag_id == self.dag_id,
TI.task_id.in_(upstream_task_ids),
TI.execution_date == self.execution_date,
TI.state == State.SUCCESS,
)
count = ti[0][0]
if count < len(task._upstream_list):
return False
if not main_session:
session.commit()
session.close()
return True
def __repr__(self):
return (
"<TaskInstance: {ti.dag_id}.{ti.task_id} "
"{ti.execution_date} [{ti.state}]>"
).format(ti=self)
def ready_for_retry(self):
"""
Checks on whether the task instance is in the right state and timeframe
to be retried.
"""
return self.state == State.UP_FOR_RETRY and \
self.end_date + self.task.retry_delay < datetime.now()
@provide_session
def pool_full(self, session):
"""
Returns a boolean as to whether the slot pool has room for this
task to run
"""
if not self.task.pool:
return False
pool = (
session
.query(Pool)
.filter(Pool.pool == self.task.pool)
.first()
)
if not pool:
return False
open_slots = pool.open_slots(session=session)
return open_slots <= 0
def run(
self,
verbose=True,
ignore_dependencies=False, # Doesn't check for deps, just runs
force=False, # Disregards previous successes
mark_success=False, # Don't run the task, act as if it succeeded
test_mode=False, # Doesn't record success or failure in the DB
job_id=None,):
"""
Runs the task instance.
"""
task = self.task
session = settings.Session()
self.refresh_from_db(session)
session.commit()
self.job_id = job_id
iso = datetime.now().isoformat()
self.hostname = socket.gethostname()
msg = "\n"
msg += ("-" * 80)
if self.state == State.UP_FOR_RETRY:
msg += "\nRetry run {self.try_number} out of {task.retries} "
msg += "starting @{iso}\n"
else:
msg += "\nNew run starting @{iso}\n"
msg += ("-" * 80)
logging.info(msg.format(**locals()))
if not force and self.state == State.SUCCESS:
logging.info(
"Task {self} previously succeeded"
" on {self.end_date}".format(**locals())
)
elif not ignore_dependencies and \
not self.are_dependencies_met(session):
logging.warning("Dependencies not met yet")
elif self.state == State.UP_FOR_RETRY and \
not self.ready_for_retry():
next_run = (self.end_date + task.retry_delay).isoformat()
logging.info(
"Not ready for retry yet. " +
"Next run after {0}".format(next_run)
)
elif force or self.state in State.runnable():
self.start_date = datetime.now()
if not force and task.pool and self.pool_full(session=session):
self.state = State.QUEUED
session.merge(self)
session.commit()
session.close()
logging.info("Pool {} is full, queuing".format(task.pool))
return
if self.state == State.UP_FOR_RETRY:
self.try_number += 1
else:
self.try_number = 1
if not test_mode:
session.add(Log(State.RUNNING, self))
self.state = State.RUNNING
self.end_date = None
if not test_mode:
session.merge(self)
session.commit()
if verbose:
if mark_success:
msg = "Marking success for "
else:
msg = "Executing "
msg += "{self.task} for {self.execution_date}"
try:
logging.info(msg.format(self=self))
if not mark_success:
task_copy = copy.copy(task)
self.task = task_copy
def signal_handler(signum, frame):
'''Setting kill signal handler'''
logging.error("Killing subprocess")
task_copy.on_kill()
raise Exception("Task received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
self.render_templates()
task_copy.execute(context=self.get_template_context())
except (Exception, StandardError, KeyboardInterrupt) as e:
self.record_failure(e, test_mode)
raise e
# Recording SUCCESS
session = settings.Session()
self.end_date = datetime.now()
self.set_duration()
self.state = State.SUCCESS
if not test_mode:
session.add(Log(State.SUCCESS, self))
session.merge(self)
session.commit()
def record_failure(self, error, test_mode=False):
logging.exception(error)
task = self.task
session = settings.Session()
self.end_date = datetime.now()
self.set_duration()
if not test_mode:
session.add(Log(State.FAILED, self))
# Let's go deeper
try:
if self.try_number <= task.retries:
self.state = State.UP_FOR_RETRY
if task.email_on_retry and task.email:
self.email_alert(error, is_retry=True)
else:
self.state = State.FAILED
if task.email_on_failure and task.email:
self.email_alert(error, is_retry=False)
except Exception as e2:
logging.error(
'Failed to send email to: ' + str(task.email))
logging.error(str(e2))
if not test_mode:
session.merge(self)
session.commit()
logging.error(str(error))
def get_template_context(self):
task = self.task
from airflow import macros
tables = None
if 'tables' in task.params:
tables = task.params['tables']
ds = self.execution_date.isoformat()[:10]
yesterday_ds = (self.execution_date - timedelta(1)).isoformat()[:10]
tomorrow_ds = (self.execution_date + timedelta(1)).isoformat()[:10]
ds_nodash = ds.replace('-', '')
ti_key_str = "{task.dag_id}__{task.task_id}__{ds_nodash}"
ti_key_str = ti_key_str.format(**locals())
params = {}
if hasattr(task, 'dag') and task.dag.params:
params.update(task.dag.params)
if task.params:
params.update(task.params)
return {
'dag': task.dag,
'ds': ds,
'yesterday_ds': yesterday_ds,
'tomorrow_ds': tomorrow_ds,
'END_DATE': ds,
'ds_nodash': ds_nodash,
'end_date': ds,
'execution_date': self.execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'tables': tables,
'task': task,
'task_instance': self,
'ti': self,
'task_instance_key_str': ti_key_str
}
def render_templates(self):
task = self.task
jinja_context = self.get_template_context()
if hasattr(self, 'task') and hasattr(self.task, 'dag'):
if self.task.dag.user_defined_macros:
jinja_context.update(
self.task.dag.user_defined_macros)
rt = self.task.render_template # shortcut to method
for attr in task.__class__.template_fields:
content = getattr(task, attr)
if content:
if isinstance(content, basestring):
result = rt(content, jinja_context)
elif isinstance(content, (list, tuple)):
result = [rt(s, jinja_context) for s in content]
elif isinstance(content, dict):
result = {
k: rt(content[k], jinja_context) for k in content}
else:
raise Exception("Type not supported for templating")
setattr(task, attr, result)
def email_alert(self, exception, is_retry=False):
task = self.task
title = "Airflow alert: {self}".format(**locals())
exception = str(exception).replace('\n', '<br>')
try_ = task.retries + 1
body = (
"Try {self.try_number} out of {try_}<br>"
"Exception:<br>{exception}<br>"
"Log: <a href='{self.log_url}'>Link</a><br>"
"Host: {self.hostname}<br>"
"Log file: {self.log_filepath}<br>"
).format(**locals())
utils.send_email(task.email, title, body)
def set_duration(self):
if self.end_date and self.start_date:
self.duration = (self.end_date - self.start_date).seconds
else:
self.duration = None
class Log(Base):
"""
Used to actively log events to the database
"""
__tablename__ = "log"
id = Column(Integer, primary_key=True)
dttm = Column(DateTime)
dag_id = Column(String(ID_LEN))
task_id = Column(String(ID_LEN))
event = Column(String(30))
execution_date = Column(DateTime)
owner = Column(String(500))
def __init__(self, event, task_instance):
self.dttm = datetime.now()
self.dag_id = task_instance.dag_id
self.task_id = task_instance.task_id
self.execution_date = task_instance.execution_date
self.event = event
self.owner = task_instance.task.owner
class BaseOperator(object):
"""
Abstract base class for all operators. Since operators create objects that
become node in the dag, BaseOperator contains many recursive methods for
dag crawling behavior. To derive this class, you are expected to override
the constructor as well as the 'execute' method.
Operators derived from this task should perform or trigger certain tasks
synchronously (wait for completion). Example of operators could be an
operator the runs a Pig job (PigOperator), a sensor operator that
waits for a partition to land in Hive (HiveSensorOperator), or one that
moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these
operators (tasks) target specific operations, running specific scripts,
functions or data transfers.
This class is abstract and shouldn't be instantiated. Instantiating a
class derived from this one results in the creation of a task object,
which ultimately becomes a node in DAG objects. Task dependencies should
be set by using the set_upstream and/or set_downstream methods.
Note that this class is derived from SQLAlquemy's Base class, which
allows us to push metadata regarding tasks to the database. Deriving this
classes needs to implement the polymorphic specificities documented in
SQLAlchemy. This should become clear while reading the code for other
operators.
:param task_id: a unique, meaningful id for the task
:type task_id: string
:param owner: the owner of the task, using the unix username is recommended
:type owner: string
:param retries: the number of retries that should be performed before
failing the task
:type retries: int
:param retry_delay: delay between retries
:type retry_delay: timedelta
:param start_date: start date for the task, the scheduler will start from
this point in time
:type start_date: datetime
:param end_date: if specified, the scheduler won't go beyond this date
:type end_date: datetime
:param schedule_interval: interval at which to schedule the task
:type schedule_interval: timedelta
:param depends_on_past: when set to true, task instances will run
sequentially while relying on the previous task's schedule to
succeed. The task instance for the start_date is allowed to run.
:type depends_on_past: bool
:param wait_for_downstream: when set to true, the task instances
of task X will wait for the dependencies of the previous instance
of task X to finish before it moves on the to next schedule.
This is useful if the different instances of a task X alter
the same asset, and this asset is used by the dependencies of task X.
:type wait_for_downstream: bool
:param queue: which queue to target when running this job. Not
all executors implement queue management, the CeleryExecutor
does support targeting specific queues.
:type queue: str
:param dag: a reference to the dag the task is attached to (if any)
:type dag: DAG
:param priority_weight: priority weight of this task against other task.
This allows the executor to trigger higher priority tasks before
others when things get backed up.
:type priority_weight: int
:param pool: the slot pool this task should run in, slot pools are a
way to limit concurrency for certain tasks
:type pool: str
"""
# For derived classes to define which fields will get jinjaified
template_fields = []
# Defines wich files extensions to look for in the templated fields
template_ext = []
# Defines the color in the UI
ui_color = '#fff'
ui_fgcolor = '#000'
@apply_defaults
def __init__(
self,
task_id,
owner,
email=None,
email_on_retry=True,
email_on_failure=True,
retries=0,
retry_delay=timedelta(seconds=300),
start_date=None,
end_date=None,
schedule_interval=timedelta(days=1),
depends_on_past=False,
wait_for_downstream=False,
dag=None,
params=None,
default_args=None,
adhoc=False,
priority_weight=1,
queue=None,
pool=None,