-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EWT-16]: Airflow fix for manual trigger during version upgrade #13
Conversation
airflow/models/dag.py
Outdated
@@ -1506,6 +1506,12 @@ def safe_dag_id(self): | |||
return self.dag_id.replace('.', '__dot__') | |||
|
|||
def get_dag(self): | |||
try: | |||
path_split = self.fileloc.split("airflow_home")[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a bit brittle, if someone creates a dir called airflow_home in their dag dir it will cause it to break. Curious why we can't split on the whole $AIRFLOW_HOME rather than just "airflow_home" (which can also break if we change the name of our airflow home dir)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here is that webserver is getting the scheduler path(for dag), and since that path doesn't exist in webserver, manual trigger breaks.
In order to fix this, I have put this hack. I think we can replace it with regex, that should be good.
Can you explain why this change is needed in a lot of detail and add a comment to this effect in the code, and also make the info message clearer? Someone is likely to bump into a problem with this code in the future and it could save them days of work/investigation. Since this is a fork, can we also file an upstream JIRA ticket to fix the root cause of the issue, and add a TODO here referencing this JIRA? Ideally we would resolve this issue upstream since it will probably bite someone again in the next few releases, but this is out of scope for your current task. |
@@ -37,7 +37,6 @@ | |||
from email.mime.multipart import MIMEMultipart | |||
from email.mime.text import MIMEText | |||
from numpy.testing import assert_array_almost_equal | |||
from six.moves.urllib.parse import urlencode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flake8 fix
@@ -20,7 +20,6 @@ | |||
import os | |||
import unittest | |||
from datetime import datetime, timedelta | |||
from tempfile import NamedTemporaryFile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flake8 fix
Added this in TODO, will create a jira for this and resolve this in upstream. |
airflow/models/dag.py
Outdated
@@ -1506,6 +1505,17 @@ def safe_dag_id(self): | |||
return self.dag_id.replace('.', '__dot__') | |||
|
|||
def get_dag(self): | |||
# TODO: Resolve this in upstream by storing relative path in db (config driven) | |||
try: | |||
# fix for manual trigger, as db stores scheduler path which is different from webserver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to: # Fix for DAGs that are manually triggered in the UI, as the DAG path in the DB is stored by the scheduler which has a different path than the webserver due to absolute paths in aurora including randomly generated job-specific directories. Due to this the path the webserver uses when it tries to trigger a DAG does not match the existing scheduler path and the DAG can not be found.
airflow/models/dag.py
Outdated
@@ -1506,6 +1505,17 @@ def safe_dag_id(self): | |||
return self.dag_id.replace('.', '__dot__') | |||
|
|||
def get_dag(self): | |||
# TODO: Resolve this in upstream by storing relative path in db (config driven) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the internal JIRA ticket under the MLWF project # TODO(CX-######):
try: | ||
# fix for manual trigger, as db stores scheduler path which is different from webserver | ||
# path, as a result, this function returns NONE | ||
path_regex = "airflow_scheduler-.-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[" \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this break for locally run airflow clusters?
Can we decouple this from the airflow scheduler job name? Currently if we change the job name it will cause a very hard to debug bug, and will require an Airflow release to change the path here.
I feel like it's probably less brittle, simpler, and less likely to break to just take the last couple of sections of path.split(os.path.sep) rather than have this regex logic. You can hardcode dags_folder in server role configs since it shouldn't even be configurable, and your change would break on a custom dags_folder anyways, and then we can add a tiny internal unit test that will basically just compare the constant with the hardcoded dag folder with a specific string with an error message like "Airflow code assumes dag folder looks like X/Y/Z, if you want to change the format you need to update the Airflow code as well and then change this test to match>. This will prevent someone from accidentally breaking Airflow internally in a hard to debug way in the future if they change the dags_folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't break in local. If the path doesn't match the regex, it picks the default path.
This is what I am planning to do. If we can store the relative path in db and make it config driven, then we can resolve this.
So, the path in db would be stored as "DAG_HOME_PATH/<relative_path>".
where 'DAG_HOME_PATH' would be a string, which would be replaced by $DAG_HOME_PATH in respective webserver, scheduler, and worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going to do that solution for open source first then I'm ok with it. Otherwise I don't like the idea of storing things in the DB, the DB is fork that's harder to manage than code, we can put it the airflow config instead.
If you are planning on making the change in open source I'm not sure how easy that would be to do in a proper way, Sumit might have more context and should be able to help you out there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have seen some issues in open-source already due to full path being stored in DB, so it'll be a great improvement overall to store the relative path in DB instead of a full path. Also, we are not talking about a fork here and will try to get this done in open-source only, but till that time we've to rely on some brittle solution like this (regex).
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (twitter-forks#13) * [EWT-16]: Airflow fix for manual trigger during version upgrade
* EWT-569 : Initial Commit for migrations * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 76fe7ac from 1.10.4 * CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00 [CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63) * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00 CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (#13) * [EWT-16]: Airflow fix for manual trigger during version upgrade * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00 [CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63) CP of f757a54 * CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (#16) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6 [CP] Contains [AIRFLOW-5597] Linkify urls in task instance log CP of f757a54 * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 4ce8d4c from 1.10.4 CP contains [TWTTR] Fix for rendering code on UI (#34) * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 299b4d8 from 1.10.4 CP contains [TWTR] CP from 1.10+twtr (#35) * 99ee040: CP from 1.10+twtr * 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint) * 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (#21) * CP 51b1aee: Relax version requiremets (#24) * CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (#25) * CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (#26) * CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (#27) * CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (#31) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb CP Contains Experiment API path fix (#37) * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 8a689af from 1.10.4 CP Contains Export scheduler env variable into worker pods. (#38) * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 5875a15 from 1.10.4 Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (#39) * [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick a68e2b3 from 1.10.4 [CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (#42) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2 [CP][EWT-128] Fetch task logs from worker pods (19ac45a) (#43) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07 [CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (#47) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88 [CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (#49) Open source commit id: b37ce29 * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71 [CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (#52) CP of 2d5b8a5 * [EWT-361] Fix broken regex pattern for extracting dataflow job id (#51) Update the dataflow URL regex as per AIRFLOW-9323 * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977 EWT-370: Use python3 to launch the dataflow job. (#53) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f * [EWT-450] fixing sla miss triggering duplicate alerts every minute (#56) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4 [CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (#57) CP of faaf179 - from master CP of 2102122 - from 1.10.12 * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd [TWTR][EWT-472] Add lifecycle support while launching worker pods (#59) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402 [TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(#60) Basically reverting commit 87fcc1c and making changes specifically into the Celery Executor class only. * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419 [CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (#61) CP of 5605d10 & apache#11462 * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9 [TWTR][EWT-350] Reverting the last commit partially (#62) * [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e [CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63) CP of f757a54
…ter-forks#13) * [EWT-16]: Airflow fix for manual trigger during version upgrade
No description provided.