Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add only_run_latest to BaseOperator #59

Closed
mistercrunch opened this issue Jun 22, 2015 · 14 comments
Closed

Add only_run_latest to BaseOperator #59

mistercrunch opened this issue Jun 22, 2015 · 14 comments
Assignees

Comments

@mistercrunch
Copy link
Member

Dependencies on an only_run_latest task are satisfied when that task has succeeded at any point in the future of the task depending on it.

When backfilling, only the end_date of only_run_latest tasks would be triggered, and dependencies for this task through time would be satisfied upon completion of this latest task instance.

When it comes to an only_run_latest task, the scheduler should skip runs to max(execution_date) of all of its dependencies.

@thibault-ketterer
Copy link
Contributor

+1, furthermore, to me, having "only_run_latest" feature makes the "skip/do not run" #262 useless

@martingrayson
Copy link

I have a weekly job that bulk loads some data. My understanding of the scheduler is that if I have a schedule interval of a week, once that week has elapsed and the scheduler kicks off the dag, I'll see 7 sequential executions (if depends_on_past=True).

This isn't the functionality I need - I'd like one dag run per week and dont even use the execution date parameter in my dag. Is this a solid case for only_run_latest or have I misunderstood the function? Is there a way I can achieve this prior to the feature being implemented using some sort of hack within my dag?

@mistercrunch
Copy link
Member Author

I think it's unrelated, it sounds to me like you need a DAG with a schedule_interval=timedelta(7). The tasks in there will get executed once a week.

Only run latest would mean that if task a depends on task b, and task b fails for 10 days an recover after, task a will only run that last day. Also tasks depending on task a will have a that dependency satisfied for all days up until the latest run.

@martingrayson
Copy link

I've been playing with this by having a start date of September 1st and schedule_interval=timedelta(7). When running the scheduler for the first time I saw a backfill over this date range - i.e. multiple task runs, one for each day that has elapsed.

What I'd actually like is one task run (rather than a months worth), as its inefficient for my process to run multiple times. Is there not a way of either marking the dag as success or skipping an execution if the rundate isn't the maximum possible rundate? I guess the first scenario I describe is when the dag is seeded, but how about if my host has problems and my dag fails to run for a month? I'd like to see a single dag execution in this scenario.

@martingrayson
Copy link

Sorry for hijacking this thread but I thought I'd drop this here in case anyone else has the same requirement or I'm doing something insane (likely).

As my job is weekly and is a bulk process that cant be incrementally run using 7 dag executions, I've used the PythonBranchOperator to skip certain dag runs. Essentially I check {ds} when the dag is kicked off and branch to a DummyOperator if {ds} isnt yesterdays date. This means my weekly dag is has 6 dummy runs and 1 logical run when its scheduled.

@r39132
Copy link
Contributor

r39132 commented Feb 22, 2016

It feels like this thread might have died.

We have a use-case for running daily DB snapshots using Airflow. We are running DB backups/snapshotting in Airflow, so that we can coordinate a DB snapshot with pausing DAGs that depend on that DB while it is down. If the DB snapshot DAG is paused for 3 days and the unpaused, we don't want it to run the DB snapshot for the past 3 days.. We have a work-around by checking the logic in the BashOperator itself, but I am wondering if we can leverage the only_run_latest idea as well.

@criccomini
Copy link
Contributor

+1 This feature would be really useful. Basically, any DAG that doesn't use incremental date-based partitioning would benefit from this.

A trivial example is a DAG that simply snapshots a full DB and loads it into a BLOB store. If it runs once, you're caught up. If you have the DAG set to run hourly, and miss an hour for whatever reason, there's no point in snapshotting the DB and loading it back-to-back. Just run the latest one, and skip the rest.

@r39132 r39132 self-assigned this Mar 16, 2016
@r39132
Copy link
Contributor

r39132 commented Mar 16, 2016

To address a question asked by @thibault-ketterer, this issue differs from #262

Only_run_latest will be used for a job like a daily db snapshot job. Typically, this type of job needs to run once a day. If it doesn't run for 3 days because the DAG was paused, when the DAG is unpaused, the job should not run 3 times. It should only run for the latest time interval.

What I propose in #262 is to mark entries manually as "skipped or do not run" so that the scheduler or backfill job does not attempt them. This could be because we know there is a data problem in the time range.

@r39132
Copy link
Contributor

r39132 commented Mar 16, 2016

I'm starting to look into an implementation. It seems that this functionality could be implemented by skipping DAG Runs that are not the latest. Unfortunately, the current handling of "Skipped" is such that it is not compatible with depends_on_past=True. So, if we skipped a bunch of DAG Runs and if the DAG specified depends_on_past=True, the latest would never be queued. It seems that we first need the system to support depends_on_past=True for skipped tasks.

This is a major roadmap item and is tracked in #1155

@pkexcellent
Copy link

Hi, I really need to use the only_run_latest feature, but I do not know how to use is in my code. Is this a feature has not beed developed or I just missed it? Can you give me a example of task declaration ? If this feature is not developed, how can I implement my code to realize the only_run_latest feature? Thanks!

@rproepp
Copy link

rproepp commented Mar 30, 2016

This feature would be really important for my use case as well. If I understand this correctly, Airflow currently cannot be used to replace a simple cronjob (without workarounds) because it will always try to backfill.

@dkromm
Copy link

dkromm commented Apr 28, 2016

Any update on this - my use-case needs an option like this in the operator

@liuhenry
Copy link

+1 to this feature! However, in the interim, you can kind of get a workaround using the BranchPythonOperator (@martingrayson mentioned something similar):

def current_date_function(ds, **kwargs):
    if date.today() == datetime.strptime(ds, '%Y-%m-%d').date():
        return 'do_something'
    else:
        return 'do_nothing'

check_current_date = BranchPythonOperator(
    task_id='check_current_date',
    provide_context=True,
    python_callable=current_date_function,
    dag=dag)

do_nothing = DummyOperator(
    task_id='do_nothing',
    dag=dag)

do_something = PostgresOperator(
    task_id='do_something',
    dag=dag)

check_current_date.set_downstream(do_nothing)
check_current_date.set_downstream(do_something)

@benjiao
Copy link

benjiao commented May 2, 2016

+1 on this feature. This will be very useful for snapshot/update-to-latest type tasks as @r39132 mentioned.

@kaxil kaxil closed this as completed Mar 20, 2020
mobuchowski added a commit to mobuchowski/airflow that referenced this issue Jan 4, 2022
…apache#59)

* serialization: fixed nested null handling

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com>

* add test

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com>

* flake8

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com>
rajatsri28 pushed a commit to rajatsri28/airflow that referenced this issue Jan 25, 2022
* EWT-569 : Initial Commit for migrations

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  76fe7ac from 1.10.4

* CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (apache#13)

* [EWT-16]: Airflow fix for manual trigger during version upgrade

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54

* CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (apache#16)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6
[CP] Contains [AIRFLOW-5597] Linkify urls in task instance log

CP of f757a54

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  4ce8d4c from 1.10.4
CP contains [TWTTR] Fix for rendering code on UI (apache#34)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  299b4d8 from 1.10.4
CP contains [TWTR] CP from 1.10+twtr (apache#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (apache#21)

* CP 51b1aee: Relax version requiremets (apache#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (apache#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (apache#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (apache#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (apache#31)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb
CP Contains Experiment API path fix (apache#37)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  8a689af from 1.10.4
CP Contains Export scheduler env variable into worker pods. (apache#38)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  5875a15 from 1.10.4
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (apache#39)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  a68e2b3 from 1.10.4
[CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (apache#42)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2
[CP][EWT-128] Fetch task logs from worker pods (19ac45a) (apache#43)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (apache#47)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (apache#49)

Open source commit id: b37ce29

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71
[CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (apache#52)

CP of 2d5b8a5

* [EWT-361] Fix broken regex pattern for extracting dataflow job id (apache#51)

Update the dataflow URL regex as per AIRFLOW-9323

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977
EWT-370: Use python3 to launch the dataflow job. (apache#53)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f
* [EWT-450] fixing sla miss triggering duplicate alerts every minute (apache#56)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (apache#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd
[TWTR][EWT-472] Add lifecycle support while launching worker pods (apache#59)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(apache#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (apache#61)

CP of 5605d10 & apache#11462

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
[TWTR][EWT-350] Reverting the last commit partially (apache#62)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests