-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix labels used to find queued KubeExecutor pods #19904
Conversation
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption).
@@ -473,7 +472,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None: | |||
base_label_selector = ( | |||
f"dag_id={pod_generator.make_safe_label_value(task.dag_id)}," | |||
f"task_id={pod_generator.make_safe_label_value(task.task_id)}," | |||
f"airflow-worker={pod_generator.make_safe_label_value(str(self.scheduler_job_id))}" | |||
f"airflow-worker={pod_generator.make_safe_label_value(str(task.queued_by_job_id))}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the scenario is this
pod is created, scheduler dies
scheduler back online
scheduler starts up the task again... i assume maybe it gets a new queued_by_job_id -- but the pod has the old id?
does the queued by id even matter? should we not just be looking at things that uniquely identify the TI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: It's using the queued_by_job_id
from the TI, which will match the label. Eventually the pod will be adopted by another (or the new) scheduler, however, this can run before that happens (it's on an interval after all) AND this does run before adoption when a scheduler starts.
Now, does it even matter? For better or worse, we use the job_id to help determine which Airflow the task is part of. For example, we watch for events based on the job id as well:
kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'} |
This becomes important if we consider a shared namespace with multiple Airflow worker pods in it. It becomes even more important if we have the same dags/tasks/scheduled runs. There are certainly still issues here, but this is at least status quo for now until we can properly fix everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is another example in the adoption process:
kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
Note: It's using the queued_by_job_id from the TI, which will match the label. Eventually the pod will be adopted by another (or the new) scheduler, however, this can run before that happens (it's on an interval after all) AND this does run before adoption when a scheduler starts.
OK so you are saying that this process (i.e. read TI from db, and use it to build the labels, then search for the pod), generally speaking, will happen before the task would e.g. be failed and retried (at which time presumably it would get a new queued by job id, after which if the pod was still out there it would no longer be found in this way). If I have you right, then that makes sense and looks good.
This becomes important if we consider a shared namespace with multiple Airflow worker pods in it. It becomes even more important if we have the same dags/tasks/scheduled runs
Makes sense. But that would not sound like a good idea!
Separate note... airflow-worker
is a misnomer right? That makes it sound like celery worker... though i have also seen it used to refer to k8s exec task pods.... and it's not that either....
But yeah i mean now that you mention it... scheduler job ids are probably just incrementing integers no? In that case it would not be the most rock solid of selectors in the shared namespace scenario. Though i'm sure would generally be very rare, maybe there should be a random cluster identifier created somewhere in the db with airflow db init
that could be used as a more direct way of keeping things separate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that is exactly my plan eventually, generate a random identifier for the instance. Unfortunately I don't think we have anything we can use right now for that purpose.
ti.refresh_from_db() | ||
assert ti.state == State.SCHEDULED | ||
assert mock_kube_client.list_namespaced_pod.call_count == 2 | ||
mock_kube_client.list_namespaced_pod.assert_any_call( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tried assert_has_calls
where you can specify the exact call list (which would be ever so marginally more direct than the approach here but apparently it does not work with MagicMock only Mock
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a)
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a)
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a)
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a)
We need to use the job_id used to queue the TI, not the current schedulers job_id. These can differ naturally with HA schedulers and with scheduler restarts (clearing "queued but not launched TIs" happens before adoption). (cherry picked from commit b80084a)
We need to use the job_id used to queue the TI, not the current
schedulers job_id. These can differ naturally with HA schedulers and
with scheduler restarts (clearing "queued but not launched TIs" happens
before adoption).