Skip to content

Commit 6aa4b33

Browse files
committed
Backfill start_date to override the tasks's
1 parent 50dcce8 commit 6aa4b33

File tree

4 files changed

+17
-1
lines changed

4 files changed

+17
-1
lines changed

airflow/bin/cli.py

+8
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ def run(args):
100100
dag = dag_pickle.pickle
101101
task = dag.get_task(task_id=args.task_id)
102102

103+
task_start_date = None
104+
if args.task_start_date:
105+
task_start_date = dateutil.parser.parse(args.task_start_date)
106+
task.start_date = task_start_date
103107
ti = TaskInstance(task, args.execution_date)
104108

105109
if args.local:
@@ -109,6 +113,7 @@ def run(args):
109113
mark_success=args.mark_success,
110114
force=args.force,
111115
pickle_id=args.pickle,
116+
task_start_date=task_start_date,
112117
ignore_dependencies=args.ignore_dependencies)
113118
run_job.run()
114119
elif args.raw:
@@ -394,6 +399,9 @@ def get_parser():
394399
parser_run.add_argument(
395400
"-sd", "--subdir", help=subdir_help,
396401
default=DAGS_FOLDER)
402+
parser_run.add_argument(
403+
"-s", "--task_start_date",
404+
help="Override the tasks's start_date (used internally)",)
397405
parser_run.add_argument(
398406
"-m", "--mark_success", help=mark_success_help, action="store_true")
399407
parser_run.add_argument(

airflow/executors/base_executor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ def queue_command(self, key, command, priority=1, queue=None):
3636

3737
def queue_task_instance(
3838
self, task_instance, mark_success=False, pickle_id=None,
39-
force=False, ignore_dependencies=False):
39+
force=False, ignore_dependencies=False, task_start_date=None):
4040
command = task_instance.command(
4141
local=True,
4242
mark_success=mark_success,
4343
force=force,
4444
ignore_dependencies=ignore_dependencies,
45+
task_start_date=task_start_date,
4546
pickle_id=pickle_id)
4647
self.queue_command(
4748
task_instance.key,

airflow/jobs.py

+4
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ def _execute(self):
508508
executor.queue_task_instance(
509509
ti,
510510
mark_success=self.mark_success,
511+
task_start_date=self.bf_start_date,
511512
pickle_id=pickle_id)
512513
ti.state = State.RUNNING
513514
if key not in started:
@@ -556,12 +557,14 @@ def __init__(
556557
force=False,
557558
mark_success=False,
558559
pickle_id=None,
560+
task_start_date=None,
559561
*args, **kwargs):
560562
self.task_instance = task_instance
561563
self.ignore_dependencies = ignore_dependencies
562564
self.force = force
563565
self.pickle_id = pickle_id
564566
self.mark_success = mark_success
567+
self.task_start_date = task_start_date
565568
super(LocalTaskJob, self).__init__(*args, **kwargs)
566569

567570
def _execute(self):
@@ -571,6 +574,7 @@ def _execute(self):
571574
force=self.force,
572575
pickle_id=self.pickle_id,
573576
mark_success=self.mark_success,
577+
task_start_date=self.task_start_date,
574578
job_id=self.id,
575579
)
576580
self.process = subprocess.Popen(['bash', '-c', command])

airflow/models.py

+3
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,7 @@ def command(
411411
local=False,
412412
pickle_id=None,
413413
raw=False,
414+
task_start_date=None,
414415
job_id=None):
415416
"""
416417
Returns a command that can be executed anywhere where airflow is
@@ -424,6 +425,8 @@ def command(
424425
ignore_dependencies = "-i" if ignore_dependencies else ""
425426
force = "--force" if force else ""
426427
local = "--local" if local else ""
428+
task_start_date = \
429+
"-s " + task_start_date.isoformat() if task_start_date else ""
427430
raw = "--raw" if raw else ""
428431
subdir = ""
429432
if not pickle and self.task.dag and self.task.dag.full_filepath:

0 commit comments

Comments
 (0)