Skip to content

Commit 5875a15

Browse files
vshshjn7Vishesh Jainaoen
authored
[EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (twitter-forks#39)
Co-authored-by: Vishesh Jain <visheshj@twitter.com> Co-authored-by: aoen <aoen@users.noreply.github.com>
1 parent 711b4f7 commit 5875a15

File tree

5 files changed

+9
-8
lines changed

5 files changed

+9
-8
lines changed

airflow/api/common/experimental/delete_dag.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ def delete_dag(dag_id, keep_records_in_log=True, session=None):
4141
if dag is None:
4242
raise DagNotFound("Dag id {} not found".format(dag_id))
4343

44-
if dag.fileloc and os.path.exists(dag.fileloc):
44+
if dag.get_local_fileloc() and os.path.exists(dag.get_local_fileloc()):
4545
raise DagFileExists("Dag id {} is still in DagBag. "
46-
"Remove the DAG file first: {}".format(dag_id, dag.fileloc))
46+
"Remove the DAG file first: {}".format(dag_id, dag.get_local_fileloc()))
4747

4848
count = 0
4949

airflow/api/common/experimental/get_code.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def get_code(dag_id): # type (str) -> str
3131
dag = check_and_get_dag(dag_id=dag_id)
3232

3333
try:
34-
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as file:
34+
with wwwutils.open_maybe_zipped(dag.get_local_fileloc(), 'r') as file:
3535
code = file.read()
3636
return code
3737
except IOError as exception:

airflow/models/dagbag.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def get_dag(self, dag_id):
117117
"""
118118
from airflow.models.dag import DagModel # Avoid circular import
119119

120+
dag = None
120121
# If asking for a known subdag, we want to refresh the parent
121122
root_dag_id = dag_id
122123
if dag_id in self.dags:
@@ -135,7 +136,7 @@ def get_dag(self, dag_id):
135136
):
136137
# Reprocess source file
137138
# TODO: remove the below hack to find relative dag location in webserver
138-
filepath = dag.fileloc if dag else orm_dag.fileloc
139+
filepath = dag.get_local_fileloc() if dag else orm_dag.get_local_fileloc()
139140
found_dags = self.process_file(
140141
filepath=correct_maybe_zipped(filepath), only_if_updated=False)
141142

@@ -248,7 +249,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
248249
if isinstance(dag, DAG):
249250
if not dag.full_filepath:
250251
dag.full_filepath = filepath
251-
if dag.fileloc != filepath and not is_zipfile:
252+
if dag.get_local_fileloc() != filepath and not is_zipfile:
252253
dag.fileloc = filepath
253254
try:
254255
dag.is_subdag = False

airflow/sensors/external_task_sensor.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,12 @@ def poke(self, context, session=None):
125125
raise AirflowException('The external DAG '
126126
'{} does not exist.'.format(self.external_dag_id))
127127
else:
128-
if not os.path.exists(dag_to_wait.fileloc):
128+
if not os.path.exists(dag_to_wait.get_local_fileloc()):
129129
raise AirflowException('The external DAG '
130130
'{} was deleted.'.format(self.external_dag_id))
131131

132132
if self.external_task_id:
133-
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
133+
refreshed_dag_info = DagBag(dag_to_wait.get_local_fileloc()).get_dag(self.external_dag_id)
134134
if not refreshed_dag_info.has_task(self.external_task_id):
135135
raise AirflowException('The external task'
136136
'{} in DAG {} does not exist.'.format(self.external_task_id,

airflow/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@
1818
# under the License.
1919
#
2020

21-
version = '1.10.4+twtr5'
21+
version = '1.10.4+twtr6'
2222

0 commit comments

Comments
 (0)