Skip to content

Commit c4e8959

Browse files
Add cli command for 'airflow dags reserialize` (#19471)
1 parent 7d555d7 commit c4e8959

File tree

4 files changed

+55
-0
lines changed

4 files changed

+55
-0
lines changed

airflow/cli/cli_parser.py

+18
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,13 @@ def _check(value):
769769
help="The maximum number of triggers that a Triggerer will run at one time.",
770770
)
771771

772+
# reserialize
773+
ARG_CLEAR_ONLY = Arg(
774+
("--clear-only",),
775+
action="store_true",
776+
help="If passed, serialized DAGs will be cleared but not reserialized.",
777+
)
778+
772779
ALTERNATIVE_CONN_SPECS_ARGS = [
773780
ARG_CONN_TYPE,
774781
ARG_CONN_DESCRIPTION,
@@ -977,6 +984,17 @@ class GroupCommand(NamedTuple):
977984
ARG_SAVE_DAGRUN,
978985
),
979986
),
987+
ActionCommand(
988+
name='reserialize',
989+
help="Reserialize all DAGs by parsing the DagBag files",
990+
description=(
991+
"Drop all serialized dags from the metadata DB. This will cause all DAGs to be reserialized "
992+
"from the DagBag folder. This can be helpful if your serialized DAGs get out of sync with the "
993+
"version of Airflow that you are running."
994+
),
995+
func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
996+
args=(ARG_CLEAR_ONLY,),
997+
),
980998
)
981999
TASKS_COMMANDS = (
9821000
ActionCommand(

airflow/cli/commands/dag_command.py

+12
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from airflow.jobs.base_job import BaseJob
3838
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
3939
from airflow.models.dag import DAG
40+
from airflow.models.serialized_dag import SerializedDagModel
4041
from airflow.utils import cli as cli_utils
4142
from airflow.utils.cli import (
4243
get_dag,
@@ -441,3 +442,14 @@ def dag_test(args, session=None):
441442
_display_dot_via_imgcat(dot_graph)
442443
if show_dagrun:
443444
print(dot_graph.source)
445+
446+
447+
@provide_session
448+
@cli_utils.action_logging
449+
def dag_reserialize(args, session=None):
450+
session.query(SerializedDagModel).delete(synchronize_session=False)
451+
452+
if not args.clear_only:
453+
dagbag = DagBag()
454+
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
455+
dagbag.sync_to_db()

docs/spelling_wordlist.txt

+3
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ Redhat
302302
ReidentifyContentResponse
303303
Reinitialising
304304
Remoting
305+
Reserialize
305306
ResourceRequirements
306307
Roadmap
307308
Robinhood
@@ -1167,6 +1168,8 @@ replicaSet
11671168
repo
11681169
repos
11691170
reqs
1171+
reserialize
1172+
reserialized
11701173
resetdb
11711174
resourceVersion
11721175
resultset

tests/cli/commands/test_dag_command.py

+22
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from airflow.cli.commands import dag_command
3131
from airflow.exceptions import AirflowException
3232
from airflow.models import DagBag, DagModel, DagRun
33+
from airflow.models.serialized_dag import SerializedDagModel
3334
from airflow.utils import timezone
3435
from airflow.utils.session import create_session
3536
from airflow.utils.state import State
@@ -62,6 +63,27 @@ def tearDownClass(cls) -> None:
6263
clear_db_runs()
6364
clear_db_dags()
6465

66+
def test_reserialize(self):
67+
# Assert that there are serialized Dags
68+
with create_session() as session:
69+
serialized_dags_before_command = session.query(SerializedDagModel).all()
70+
assert len(serialized_dags_before_command) # There are serialized DAGs to delete
71+
72+
# Run clear of serialized dags
73+
dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize', "--clear-only"]))
74+
# Assert no serialized Dags
75+
with create_session() as session:
76+
serialized_dags_after_clear = session.query(SerializedDagModel).all()
77+
assert not len(serialized_dags_after_clear)
78+
79+
# Serialize manually
80+
dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize']))
81+
82+
# Check serialized DAGs are back
83+
with create_session() as session:
84+
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
85+
assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back
86+
6587
@mock.patch("airflow.cli.commands.dag_command.DAG.run")
6688
def test_backfill(self, mock_run):
6789
dag_command.dag_backfill(

0 commit comments

Comments
 (0)