-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-5567] BaseReschedulePokeOperator #6210
Conversation
My team was thinking of creating similar features but we haven't started work and it's possible that the plans will change. However, I will gladly help in implementing this features in the community. Some observation that I can add now it is useful that the functionality of the code execution after the operation is completed.
Code execution only before the operator is insufficient in more complex processes. The current implementation lacks a mechanism for passing the context of operations to the next step, e.g. the ID of the created object. |
@mik-laj this is mostly a discussion piece of a PR so thanks for joining in! |
I think it's worth introducing a standardized way of passing resource ID. This will also be very useful in extra links. |
Check if we have the XCOM_EXTERNAL_RESOURCE_ID_KEY | ||
for this task and call submit_request if it is missing. | ||
""" | ||
if not self.get_external_resource_id(context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should not rely on this content. I can imagine that there may be operators that do not operate on any identifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an operator doesn't operate on an identifier what is there to poke? Maybe we can have BaseAsyncOperator set a default 'PLACEHOLDER_ID'
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can only be one object in the universe. In this case, there is a constant question about a unique object. After thinking for a long time, however, I have concerns about performance. Now we have one query to the database, and if we tried to save the state then there would always be two queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, I'm really not following you here.
By "one object in the universe" do you mean there can only be one external resource?
Why is the object uniqueness in question?
Can you explain the performance concern? performance of what? the task? the submit? I'm envisioning this async operator pattern only being used for long running operations where you can tolerate a poke interval of >1 min.
**kwargs) -> None: | ||
super().__init__(mode='reschedule', *args, **kwargs) | ||
|
||
def submit_request(self, context) -> string: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def submit_request(self, context) -> string: | |
def submit_request(self, context) -> IT: |
I imagine that the identifier will be a different type, e.g. a number, or an array with several identifiers. What do you think about using generic types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj why not string / dict / json-serializable string? if this is using xcom, and xcom pickling is going away, is that not a good solution? Even if we were to add a new database column and not use xcom, wouldn't json serializability be desired? (have not really used generics and not sure what their value is here... interested in understanding more about what you are suggesting)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generic class allows you to provide information about the type, and at the same time specify that the same type occurs in several places. It can still be string or dict or other serializable value, but it will not be specified by this class, but it will have to be determined by the child class.
For example: We have a Stack class. The list can contain two methods: push(A) -> None
, poll() -> A
; If we want to point out that the given A in both cases must be equal in the child class, we can mark it using generic. In this case, the child class will have to specify the type, but we won't have to do it because we don't know what type it can be. I know it should be a subset of types, but I don't know what type exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A very useful change. I hope that together we will be able to develop it further.
**kwargs) -> None: | ||
super().__init__(mode='reschedule', *args, **kwargs) | ||
|
||
def submit_request(self, context) -> string: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj why not string / dict / json-serializable string? if this is using xcom, and xcom pickling is going away, is that not a good solution? Even if we were to add a new database column and not use xcom, wouldn't json serializability be desired? (have not really used generics and not sure what their value is here... interested in understanding more about what you are suggesting)
from airflow.exceptions import AirflowException | ||
from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY | ||
|
||
class BaseAsyncOperator(BaseSensorOperator, SkipMixin): |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a similar thought process. Where I ended up was the implementation of the common parent would nearly identical to BaseSensorOperator
. The logic being BaseAsyncOperator
just implements a few more methods (e.g [submit/process]_request
) and is opinionated that mode=reschedule
.
The main reasoning to subclass v.s. enhancing the existing Sensor is I wanted to use the same execute
method but have a class nomenclature that indicates that subclasses of this are intended to take action. I think people are less likely to understand that a Sensor can take action rather than being a read-only poller.
The spirit of this class is to provide an improved way for what we'd traditionally do with Operators so I wanted a class with the name Operator. Perhaps this is short sighted.
I've refactored to do the submit / process in the execute method of BaseSensorOperator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ref. the name, I agree that it's important for people to consider this as something that takes action and that's implied by the Operator suffix. That's why I consider BaseSensorOperator in its current form to have a misleading name: it implies that it's both for taking action and sensing. Luckily the actual derived sensor implementations are all suffixed with Sensor - e.g. EmrStepSensor - which doesn't mislead.
I feel like having "Async" in your new class highlights the wrong thing. It's not that it is async in the sense of non-synchronous or rescheduled (Airflow terminology) that makes this class unique, since all sensors derived from BaseSensorOperator can operate as async/rescheduled: it implies not occupying a task slot. What differentiates your class is that it combines action and sensing. That's why I tend to think that the already used BaseSensorOperator would actually be a great name to describe what your new class does; so why not enhance the existing class to optionally do the action part?
There might be technical reasons to have a separate class - e.g. code is easier to read or to reduce risk by extending rather than modifying (SOLID) - but I can't think of a better alternative to "Async": BaseDoThenWaitOperator (hah); BaseActionSensor; maybe BaseAtomicOperator; eventually back to BaseSensorOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of async operators! However I have a little concern regarding the approach to developing new operators. How should I decide whether I should create an async operator or operator+sensor? Or should I try to create an operator that could be used in sync or async mode by passing an async=True
flag?
@nuclearpinguin I think those concerns are quite valid.
IMO if your operator and sensor refer to the same resource (job, query, etc.) you'd almost always want to use async operator. This has to do w/ retries if your sensor fails and you want to retry the submit, then it should be async operator (this wouldn't be possible w/ operator+sensor w/o a whole re-run of the DAG). I'd love to hear differing opinions here.
I was just thinking about this today as well. Similarly: "for our existing operators how do we expose async versions". Are these new operators? I think you could develop a new operator that extends if self.async:
super().execute(context) # use the `BaseAsyncOperator`'s execute defined in this PR.
else:
#Note this is what most operators that wait on long running operations look like.
self.submit_request(context)
while not self.poke():
time.sleep(self.poke_interval)
self.process_results(context) |
I can refactor the |
Digging deeper on |
Dataproc operators will be rewritten to the goog-cloud library - * in the near future. Will it bother you? |
@mik-laj Is there any operator you can think of that might be a better example to refactor as an async operator that might not be throw-away work? |
I have noticed in looking at existing dataflow and dataproc hooks many hooks conflate job submission and |
Not sure what cloud provider you're using, but if you have access to AWS: EmrAddStepsOperator and EmrStepSensor. Even if you don't, these do have existing unit tests. There's an ongoing thread (link) in airflow-dev about exactly these. |
from airflow.utils.decorators import apply_defaults | ||
|
||
|
||
class BaseAsyncOperator(BaseSensorOperator, SkipMixin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a need to reference SkipMixin given that BaseSensor already extends this class?
**kwargs) -> None: | ||
super().__init__(mode='reschedule', *args, **kwargs) | ||
|
||
@abstractmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we extend ABC in this class? maybe there is a reason we don't do this?
from airflow.sensors.base_sensor_operator import BaseSensorOperator | ||
from airflow.utils.decorators import apply_defaults | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I think I may have found an issue.
In live testing it appeared XCOM was not recorded. In actuality, the problem is XCOM is cleared when task restarts after reschedule. So after first rescheduled poke, the xcom is obliterated, and resource id is lost.
XCOM data appears to be cleared at start of each task. See here.
So when task restarts after reschedule, we lose the resource id.
Probably a similar explanation for invisible logs issue i commented on earlier.
Here's my sample operator:
class Op(BaseAsyncOperator):
def submit_request(self, context: Dict) -> str:
return '129uh8981h9u80eh'
def poke(self, context):
ti = context['ti']
print(f"try_number: {ti.try_number}")
for k, v in context.items():
print(f"{k}: {v}")
print("\n sleeping")
import time
time.sleep(60)
return False
Not sure what is best way to resolve.
It's also curious that state goes initially to up_for_reschedule
before later becoming up_for_retry
.... I am not sure why that is but I have not used sensors / rescheduling before...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XCOM data appears to be cleared at start of each task. See here.
So when task restarts after reschedule, we lose the resource id.
Ouch, that is quite a limitation for using XCom. Is there anywhere else to store task state? If not, some options:
-
Use
mode='poke'
, at the expense of using up a task slot for long-running tasks. Not ideal, but gets "correct" behaviour. It'd then be "atomic" rather than "async" behaviour. -
Enhance
TaskInstance
to make clearing XCom data conditional by delegating it to the task/operator. There could be a new function likeBaseOperator.pre_execute_clear_state()
which can be overridden by implementers.BaseOperator
is already aware of / coupled withTaskInstance
, so I don't think we'd be breaking separation of concerns any more than it already is? -
There might be enough justification to say that for rescheduled tasks (i.e. transition from
State.UP_FOR_RESCHEDULE
toState.RUNNING
) thenTaskInstance
shouldn't clear XCom. The call torun()
->_check_and_change_state_before_execution()
does know about this state change, but I see in the code that there are places which bypass the call torun()
and go directly to_run_raw_task()
(e.g. the CLI).
Seems that some form of option 2 is the least risky way to get the desired outcome
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ref. State.UP_FOR_RESCHEDULE
, this was added back in January by @seelmann as part of AIRFLOW-2747. Perhaps he has some thoughts on whether or not we can change the contract for rescheduled tasks to say that we retain XCom state? It's a fairly recent feature, so that might be okay, e.g. operators may not be depending on the current behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me we definitely want to wipe XComs at the end of a DagRun as to not affect a re run of this dag for this execution date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of consensus should we build on that kind of change for XCom in reschedule scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be careful with renaming classes. I think the async capabilities should be merged into the BaseOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with trying to put the behaviour into BaseOperator. As it stands, the use-cases are:
- Only operate (
BaseOperator
<- e.g.EmrAddStepsOperator
) - Only sense (
BaseSensorOperator
<- e.g.EmrStepSensor
) - Do both (to be determined)
- Do none (
BaseOperator
<- e.g.DummyOperator
)
Currently 1, 2 and 4 are implemented as variations of execute(); 2 is the odd one out in having a special class, as would 3 with the current proposal.
If BaseOperator had a default execute() that runs two phases, the other behaviours can be achieved by optionally implementing either of the phases.
It's a typical problem using class hierarchies for behaviours, that it's hard to mix and match. I was actually thinking about whether we could use traits to keep the behaviours cleanly separated into their own implementations. But it's probably overkill with so few.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something else to consider about a potential "multi phase" BaseOperator is that we already have pre_execute()
+ execute()
+ post_execute()
and the order is governed by TaskInstance._run_raw_task()
(link). My understanding is that these hooks are always called even when tasks are rescheduled. I'm not entirely sure of the purpose, the docs simply say "... it’s mostly a hook for people deriving operators" and the only two places I can see in the source-code where they're used is: CloudSqlQueryOperator to ensure the db connection is created and torn down; and for Lineage support to ensure the lists of inlets and outlets are built and saved to Xcom (another kind of state retention!).
When considering extra phases, they'd be orthogonal to above and really some form of "execution phase N": the BaseOperator starts to behave a bit like a SubDag where the overall task status / state machine runs its course while what would have been sub-tasks in the SubDag now run as sequential execution phases in the same task. Instead of the sub-tasks being visible in the UI as DAG nodes, these execution phases are internalised into the BaseOperator.
Why would we want to do that? The justification is that these execution phases are tightly coupled and should be retried together (and in order) as some kind of atomic unit.
If we were to generalise this much, I think we should support being able to run through the execution phases synchronously (taking up a task slot) or asynchronously (using reschedule).
Sorry this is getting a bit philosophical but I'm trying to provide some guiding context about why we're doing this! I also see it as a potential future foundation for a mechanism to compose operators via the Airflow API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to the practical, can I propose:
-
New BaseOperator hooks are implemented - something like
execute_phase_start()
andexecute_phase_final()
- that by default simplyraise NotImplementedError()
as per currentexecute()
implementation -
New BaseOperator default
execute()
implementation that calls the two hooks in order. If the task was rescheduled, skip the first phase. I've seen code that detects this case inBaseSensorOperator.execute()
(link). -
Existing overrides of
BaseOperator.execute()
should then behave as-is, i.e. the phase hooks are simply never called. The currentBaseSensorOperator
implementation would continue to work, minimising the amount of work needed in this PR; it could later be reimplemented to use the new phased execution. -
Any new operators implemented to combine an operation + sensor can override the new phase hooks, since they'd be multi-phase aware.
Allowing only two phases simplifies the implementation for now given the immediate requirements, but could even be extended in future for any number of phases if we wanted to store the current/next phase as part of the task instance state. I can't think of a good practical need right now.
What do you guys think, is this proposal (or something like it) workable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see why we need another execute_phase_{start,final}
. The pre-execute can be overridden by a custom implementation, but in practice, this hasn't been done much.
So the TaskInstance._run_raw_task()
is what's being executed on the worker, and this will just actually run the whole task to the end.
The justification is that these execution phases are tightly coupled and should be retried together (and in order) as some kind of atomic unit.
I really agree with that. We've seen situations when the DAG would start with creating a cluster, but this breaks the atomicity of the DAG. When the cluster is being killed (maybe because of idle timeout), the only way to retry the workflow, is by restarting the whole dag.
It is still not clear to me how the hierarchy will look like. I think we should move most of the reschedule logic into the BaseOperator. Maybe we can enrich the context
which is being passed to the execute method with reschedule logic to let the implementation of the execute step know that you're rescheduling.
… into feature/baseAsyncOperator
self.assertIsNotNone(resource_id) | ||
self.assertTrue(resource_id.startswith('job_id')) | ||
|
||
def test_ok_with_reschedule(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dstandish I'll try to improve this unit test until it reproduces your issue w/ XCom clearing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR has been merged; #6370
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I need more time to get this to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaketf @JonnyIncognito I'm getting a lot of pushback on idempotency, which is a fair point. Keeping state makes the function inpure by definition. How about adding labels to the services that we launch on GCP. This label should contain a tuple of (composer_instance, dag_id, task_id, execution_date)
to make it unique. In this case, we could just query the state using the GCP API's. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko are you considering abandoning the persisting of TI state on metastore? So you persist state externally? I think think this TI state persistence is crucial feature and must be supported. XCom can support it with appropriate changes. We just have to figure out what those are. Another option is just let writer of operator choose when xcom is cleared by calling clear_xcom_data
when appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to investigate other options. I'm not ruling anything out, but I think about it. If we could avoid storing state in the metastore, then it would make life simpler, and I like simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko the ability to retain state is super important for a rescheduled task, I agree with @dstandish on this. These are mostly used where external long-lived resources are created. For sensors we've dodged the issue because they tend to use XCom to fetch the state from a prior task. But if we're now going further and having multi-phased / composed tasks, there needs to be a general Airflow mechanism for that state rather than hacking around it with e.g. cloud-provider specific things like labels.
Keeping state makes the function impure by definition.
Depends on the scope of the idempotency. I can see why this argument could be used for existing operators that are supposed to do the same thing for every call of execute()
. However, for ones with multiple phases they should be idempotent by the time the task succeeds, across multiple calls to execute()
, i.e. the scope is a bit wider.
How about we only clear the state when needed? Options:
-
Delegate clearing to the task (default implementation always clears). If you look at my proposal in other thread (link) the task could clear all state on the initial phase but not for subsequent phases / reschedules.
-
Clear state only if there was no prior transition to
State.UP_FOR_RESCHEDULE
. I've seen code that detects this but it'd add a cost for every user (link). -
Add some concept of a reschedule counter to the task instance and only clear all state on the first execution. If it were more along the lines of a phase counter, it could later be used for tasks that have more than two phases to figure out which code path to follow on each
execute()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the problem is with the ti.xcom_pull(include_prior_dates=true)
. If you run this one on the first poll, then it might be different than with subsequent polls. Personally I would be fine with the first one option and having some specific key for storing the state.
@JonnyIncognito I'm a gcp-er don't have aws access. |
@jaketf I'm almost certainly going to write an operator that combines EmrAddStepsOperator and EmrStepSensor on top of your work once we figure out the state retention issue, so I'd be happy to contribute it to your PR as an example if you don't get to a GCP example first. Question is what we're going to name this style of operators, related to the other thread about the class hierarchy and base naming. Options:
I suspect these are going to become very popular, so it's important to start off with a good convention! |
… into feature/baseAsyncOperator
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep | ||
|
||
|
||
class BaseReschedulePokeOperator(BaseOperator, SkipMixin, ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how on_kill
get invoked if we shutdown the task through Airflow UI when the task is waiting to be rescheduled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is a really good point. I'm not sure. I imagine it wouldn't run until the task was rescheduled.
self.set_external_resource_id(context, None) | ||
|
||
@staticmethod | ||
def set_external_resource_id(context, value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would generalize this, and maybe call it set_state()
. Please add type annotations.
So there were concerns with @Fokko's xcom change re idempotency. I think it makes sense to create second table, very similar to xcom, but designed specifically to support stateful tasks. The table could perhaps be called TaskState. I previously shared the concern, why create another table that is almost identical to xcom. But the reality is XCom is problematic for stateful tasks in a number of ways. Obviously there is the clearing / idempotency issue. But additionally if you use trigger dag, with XCom your next scheduled run won't get current state because it sorts by execution_date. WDYT? |
@dstandish Sorry for the late reply. I think having a second table or changing the existing xcom table is just an implementation detail. Having a state table will have a fundamental impact on the idempotency of the execution of the tasks. Why would the manual triggering of a dag introduce issues, the execution date will be equal to the moment that it was triggered. I think it should work as well. Since this will introduce such as a fundamental change to the way operators were intended, being idempotent, I think it would be great to first start an AIP on the topic, so we can have a clear an structured approach. |
It's optional to use such a thing. Just like it is with XCom. If you don't use it, nothing is changed.
Because execution_date is run date minus one interval, and
Outcome:
The schedule interval edge PR would resolve the execution date ordering problem. But if XCom is cleared at start of task, it is remains problematic as a mechanism for state persistence.
An AIP sounds appropriate. But I'll just note that I see this more as better support for common use pattern rather than fundamental change of anything. I suspect stateful use of airflow (including the use of XCom as state persistence mechanism) is quite common. |
this doesn't help this immediate PR.... BUT... i just realized that as a user, you can add a model .... e.g. an XCom-like table that is designed how you want it... AND... using the plugins system, you can add an admin view for that model so you can inspect and edit the entries just as you would with XCom... so if you personally need to manage state for tasks and cannot wait until this functionality makes it into airflow, you can roll your own, and it's not that hard. if anyone wants sample code i am happy to share you can ping me on slack so stoked to have stumbled upon this |
Some of this topic seems to be covered in |
I think the conversation here has been educational and thanks all for chiming in. However, there was some discussion that adding support for stateful tasks in airflow would have broader impact than just this rescheduling case. From my read through all these threads it seems the key open questions for the rescheduling approach similar to this PR are:
I'm not that familiar w/ AIP process and if it's something a small group of people from the community get aligned on in a meeting/slack before filing or something and individual just proposes. |
I think the original AsyncOperator was a better name for this :). And in fact it looks like it is quite highly requested feature in the AirflowSurvey (https://airflow.apache.org/blog/airflow-survey/):
I am also interested in discussing this - I will create a channel: #sig-async-operators in slack and invite you all there. |
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
Make sure you have checked all steps below.
Jira
Description
Tests
Commits
Documentation