Skip to content

Dataset aware scheduling - is there a way to reset DAG? #36618

Answered by funes79
funes79 asked this question in Q&A
Discussion options

You must be logged in to vote

Came up with this workaround, so whenever the dataset scheduled DAG gets " out of sync" I reset the datasets state by running this helper DAG. Essentially I am clearing out the DAG's dataset queue.

def reset_dataset_queues(**kwargs):
    params = kwargs.get("params", {})
    print("Resetting for ", params)
    from airflow.utils.db import create_session
    from sqlalchemy import delete, select
    from airflow.models.dataset import DatasetDagRunQueue

    dag_id = params.get("dag_id")
    with create_session() as session:
        session.execute(delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_id))
    print(session.execute(select(DatasetDagRunQueue).where(Dataset…

Replies: 2 comments 3 replies

Comment options

You must be logged in to vote
0 replies
Comment options

You must be logged in to vote
3 replies
@cmarteepants
Comment options

@Amin-Siddique
Comment options

@SuaveCharlie
Comment options

Answer selected by funes79
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
4 participants