Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5567] BaseReschedulePokeOperator #6210

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions airflow/models/base_async_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Base Asynchronous Operator for kicking off a long running
operations and polling for completion with reschedule mode.
"""
from abc import abstractmethod
from functools import wraps
from typing import Dict, List, Union, Optional

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.exceptions import AirflowException, AirflowRescheduleException
from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
from airflow.models import TaskReschedule

PLACEHOLDER_RESOURCE_ID = 'RESOURCE_ID_NOT_APPLICABLE'

Copy link
Contributor

@dstandish dstandish Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I think I may have found an issue.

In live testing it appeared XCOM was not recorded. In actuality, the problem is XCOM is cleared when task restarts after reschedule. So after first rescheduled poke, the xcom is obliterated, and resource id is lost.

XCOM data appears to be cleared at start of each task. See here.

So when task restarts after reschedule, we lose the resource id.

Probably a similar explanation for invisible logs issue i commented on earlier.

Here's my sample operator:

class Op(BaseAsyncOperator):

    def submit_request(self, context: Dict) -> str:
        return '129uh8981h9u80eh'

    def poke(self, context):
        ti = context['ti']
        print(f"try_number: {ti.try_number}")

        for k, v in context.items():
            print(f"{k}: {v}")

        print("\n sleeping")
        import time
        time.sleep(60)

        return False

Not sure what is best way to resolve.

It's also curious that state goes initially to up_for_reschedule before later becoming up_for_retry.... I am not sure why that is but I have not used sensors / rescheduling before...

Copy link

@JonnyIncognito JonnyIncognito Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XCOM data appears to be cleared at start of each task. See here.

So when task restarts after reschedule, we lose the resource id.

Ouch, that is quite a limitation for using XCom. Is there anywhere else to store task state? If not, some options:

  1. Use mode='poke', at the expense of using up a task slot for long-running tasks. Not ideal, but gets "correct" behaviour. It'd then be "atomic" rather than "async" behaviour.

  2. Enhance TaskInstance to make clearing XCom data conditional by delegating it to the task/operator. There could be a new function like BaseOperator.pre_execute_clear_state() which can be overridden by implementers. BaseOperator is already aware of / coupled with TaskInstance, so I don't think we'd be breaking separation of concerns any more than it already is?

  3. There might be enough justification to say that for rescheduled tasks (i.e. transition from State.UP_FOR_RESCHEDULE to State.RUNNING) then TaskInstance shouldn't clear XCom. The call to run() -> _check_and_change_state_before_execution() does know about this state change, but I see in the code that there are places which bypass the call to run() and go directly to _run_raw_task() (e.g. the CLI).

Seems that some form of option 2 is the least risky way to get the desired outcome

Copy link

@JonnyIncognito JonnyIncognito Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ref. State.UP_FOR_RESCHEDULE, this was added back in January by @seelmann as part of AIRFLOW-2747. Perhaps he has some thoughts on whether or not we can change the contract for rescheduled tasks to say that we retain XCom state? It's a fairly recent feature, so that might be okay, e.g. operators may not be depending on the current behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me we definitely want to wipe XComs at the end of a DagRun as to not affect a re run of this dag for this execution date.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of consensus should we build on that kind of change for XCom in reschedule scenarios?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful with renaming classes. I think the async capabilities should be merged into the BaseOperator.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with trying to put the behaviour into BaseOperator. As it stands, the use-cases are:

  1. Only operate (BaseOperator <- e.g. EmrAddStepsOperator)
  2. Only sense (BaseSensorOperator <- e.g. EmrStepSensor)
  3. Do both (to be determined)
  4. Do none (BaseOperator <- e.g. DummyOperator)

Currently 1, 2 and 4 are implemented as variations of execute(); 2 is the odd one out in having a special class, as would 3 with the current proposal.

If BaseOperator had a default execute() that runs two phases, the other behaviours can be achieved by optionally implementing either of the phases.

It's a typical problem using class hierarchies for behaviours, that it's hard to mix and match. I was actually thinking about whether we could use traits to keep the behaviours cleanly separated into their own implementations. But it's probably overkill with so few.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something else to consider about a potential "multi phase" BaseOperator is that we already have pre_execute() + execute() + post_execute() and the order is governed by TaskInstance._run_raw_task() (link). My understanding is that these hooks are always called even when tasks are rescheduled. I'm not entirely sure of the purpose, the docs simply say "... it’s mostly a hook for people deriving operators" and the only two places I can see in the source-code where they're used is: CloudSqlQueryOperator to ensure the db connection is created and torn down; and for Lineage support to ensure the lists of inlets and outlets are built and saved to Xcom (another kind of state retention!).

When considering extra phases, they'd be orthogonal to above and really some form of "execution phase N": the BaseOperator starts to behave a bit like a SubDag where the overall task status / state machine runs its course while what would have been sub-tasks in the SubDag now run as sequential execution phases in the same task. Instead of the sub-tasks being visible in the UI as DAG nodes, these execution phases are internalised into the BaseOperator.

Why would we want to do that? The justification is that these execution phases are tightly coupled and should be retried together (and in order) as some kind of atomic unit.

If we were to generalise this much, I think we should support being able to run through the execution phases synchronously (taking up a task slot) or asynchronously (using reschedule).

Sorry this is getting a bit philosophical but I'm trying to provide some guiding context about why we're doing this! I also see it as a potential future foundation for a mechanism to compose operators via the Airflow API.

Copy link

@JonnyIncognito JonnyIncognito Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the practical, can I propose:

  • New BaseOperator hooks are implemented - something like execute_phase_start() and execute_phase_final() - that by default simply raise NotImplementedError() as per current execute() implementation

  • New BaseOperator default execute() implementation that calls the two hooks in order. If the task was rescheduled, skip the first phase. I've seen code that detects this case in BaseSensorOperator.execute() (link).

  • Existing overrides of BaseOperator.execute() should then behave as-is, i.e. the phase hooks are simply never called. The current BaseSensorOperator implementation would continue to work, minimising the amount of work needed in this PR; it could later be reimplemented to use the new phased execution.

  • Any new operators implemented to combine an operation + sensor can override the new phase hooks, since they'd be multi-phase aware.

Allowing only two phases simplifies the implementation for now given the immediate requirements, but could even be extended in future for any number of phases if we wanted to store the current/next phase as part of the task instance state. I can't think of a good practical need right now.

What do you guys think, is this proposal (or something like it) workable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see why we need another execute_phase_{start,final}. The pre-execute can be overridden by a custom implementation, but in practice, this hasn't been done much.

So the TaskInstance._run_raw_task() is what's being executed on the worker, and this will just actually run the whole task to the end.

The justification is that these execution phases are tightly coupled and should be retried together (and in order) as some kind of atomic unit.

I really agree with that. We've seen situations when the DAG would start with creating a cluster, but this breaks the atomicity of the DAG. When the cluster is being killed (maybe because of idle timeout), the only way to retry the workflow, is by restarting the whole dag.

It is still not clear to me how the hierarchy will look like. I think we should move most of the reschedule logic into the BaseOperator. Maybe we can enrich the context which is being passed to the execute method with reschedule logic to let the implementation of the execute step know that you're rescheduling.

class BaseAsyncOperator(BaseSensorOperator, SkipMixin):

This comment was marked as resolved.

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a similar thought process. Where I ended up was the implementation of the common parent would nearly identical to BaseSensorOperator. The logic being BaseAsyncOperator just implements a few more methods (e.g [submit/process]_request) and is opinionated that mode=reschedule.
The main reasoning to subclass v.s. enhancing the existing Sensor is I wanted to use the same execute method but have a class nomenclature that indicates that subclasses of this are intended to take action. I think people are less likely to understand that a Sensor can take action rather than being a read-only poller.

The spirit of this class is to provide an improved way for what we'd traditionally do with Operators so I wanted a class with the name Operator. Perhaps this is short sighted.

I've refactored to do the submit / process in the execute method of BaseSensorOperator

Copy link

@JonnyIncognito JonnyIncognito Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ref. the name, I agree that it's important for people to consider this as something that takes action and that's implied by the Operator suffix. That's why I consider BaseSensorOperator in its current form to have a misleading name: it implies that it's both for taking action and sensing. Luckily the actual derived sensor implementations are all suffixed with Sensor - e.g. EmrStepSensor - which doesn't mislead.

I feel like having "Async" in your new class highlights the wrong thing. It's not that it is async in the sense of non-synchronous or rescheduled (Airflow terminology) that makes this class unique, since all sensors derived from BaseSensorOperator can operate as async/rescheduled: it implies not occupying a task slot. What differentiates your class is that it combines action and sensing. That's why I tend to think that the already used BaseSensorOperator would actually be a great name to describe what your new class does; so why not enhance the existing class to optionally do the action part?

There might be technical reasons to have a separate class - e.g. code is easier to read or to reduce risk by extending rather than modifying (SOLID) - but I can't think of a better alternative to "Async": BaseDoThenWaitOperator (hah); BaseActionSensor; maybe BaseAtomicOperator; eventually back to BaseSensorOperator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need to reference SkipMixin given that BaseSensor already extends this class?

"""
AsyncOperators are derived from this class and inherit these attributes.

AsyncOperators must define a `submit_request` to fire a request for a
long running operation with a method and then executes a `poke` method
executing at a time interval and succeed when a criteria is met and fail
if and when they time out. They are effctively an opinionated way use
combine an Operator and a Sensor in order to kick off a long running
process without blocking a worker slot while waiting for the long running
process to complete by leveraging reschedule mode.

:param soft_fail: Set to true to mark the task as SKIPPED on failure
:type soft_fail: bool
:param poke_interval: Time in seconds that the job should wait in
between each tries
:type poke_interval: int
:param timeout: Time, in seconds before the task times out and fails.
:type timeout: int
"""
ui_color = '#9933ff' # type: str

@apply_defaults
def __init__(self,
*args,
**kwargs) -> None:
super().__init__(mode='reschedule', *args, **kwargs)

@abstractmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we extend ABC in this class? maybe there is a reason we don't do this?

def submit_request(self, context) -> Optional[Union[String, List, Dict]]:
"""
This method should kick off a long running operation.
This method should return the ID for the long running operation if
applicable.
Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

:returns: a resource_id for the long running operation.
:rtype: str
"""
raise AirflowException('Async Operators must override the `submit_request` method.')

def process_result(self, context):
"""
This method can optionally be overriden to process the result of a long running operation.
Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.
"""
self.log.info('Using default process_result. Got result of %s. Done.',
self.get_external_resource_id(context))

def execute(self, context):
# On the first execute call submit_request and set the
# external resource id.
task_reschedules = TaskReschedule.find_for_task_instance(context['ti'])
if not task_reschedules:
resource_id = self.submit_request(self, context)
if not resource_id:
resource_id = PLACEHOLDER_RESOURCE_ID
self.set_external_resource_id(context, resource_id)

super().execute(self, context)

#TODO(jaketf) validate comment below w/ tests.
# The above should raise AirflowRescheduleException if we are
# rescheduling a poke, and thus never reach this code below.
try:
resource_id = self.get_external_resource_id(context)
if resource_id == PLACEHOLDER_RESOURCE_ID:
self.log.info("Calling process_request for %s.", resource_id)
else:
self.log.info("Calling process_request.")
self.process_request(context)
finally:
# Clear the resource id for this task..
self.set_external_resource_id(context, None)

@staticmethod
def set_external_resource_id(context, value):
return context['ti'].xcom_push(key=XCOM_EXTERNAL_RESOURCE_ID_KEY,
value=value)

@staticmethod
def get_external_resource_id(context):
return context['ti'].xcom_pull(task_ids=context['task'].task_id,
key=XCOM_EXTERNAL_RESOURCE_ID_KEY)
1 change: 1 addition & 0 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
MAX_XCOM_SIZE = 49344
XCOM_RETURN_KEY = 'return_value'
XCOM_EXTERNAL_RESOURCE_ID_KEY = 'external_resource_id'


class XCom(Base, LoggingMixin):
Expand Down
6 changes: 5 additions & 1 deletion airflow/sensors/base_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
AirflowSkipException, AirflowRescheduleException
from airflow.models import BaseOperator, SkipMixin, TaskReschedule
from airflow.models import BaseOperator, SkipMixin, TaskReschedule, \
BaseAsyncOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
Expand Down Expand Up @@ -121,8 +122,11 @@ def execute(self, context: Dict) -> None:
raise AirflowRescheduleException(reschedule_date)
else:
sleep(self.poke_interval)

self.log.info("Success criteria met. Exiting.")



def _do_skip_downstream_tasks(self, context: Dict) -> None:
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
Expand Down