Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Docker Taskflow decorator #15330

Merged
merged 22 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 22 additions & 115 deletions airflow/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,129 +15,36 @@
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING

from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import _virtualenv_task
from airflow.decorators.python import PythonDecoratorMixin, python_task # noqa
from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
from airflow.decorators.task_group import task_group # noqa
from airflow.models.dag import dag # noqa
from airflow.providers_manager import ProvidersManager


class _TaskDecorator:
def __call__(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin):
def __getattr__(self, name):
if name.startswith("__"):
raise AttributeError(f'{type(self).__name__} has no attribute {name!r}')
decorators = ProvidersManager().taskflow_decorators
if name not in decorators:
raise AttributeError(f"task decorator {name!r} not found")
return decorators[name]

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return self.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)

@staticmethod
def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
# [START mixin_for_autocomplete]
if TYPE_CHECKING:
try:
from airflow.providers.docker.decorators.docker import DockerDecoratorMixin

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)

@staticmethod
def virtualenv(
python_callable: Optional[Callable] = None,
multiple_outputs: Optional[bool] = None,
requirements: Optional[Iterable[str]] = None,
python_version: Optional[Union[str, int, float]] = None,
use_dill: bool = False,
system_site_packages: bool = True,
string_args: Optional[Iterable[str]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[List[str]] = None,
**kwargs,
):
"""
Allows one to run a function in a virtualenv that is
created and destroyed automatically (with certain caveats).
The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside of the scope may be referenced. A global scope
variable named virtualenv_string_args will be available (populated by
string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
can use a return value.
Note that if your virtualenv runs in a different Python major version than Airflow,
you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
Airflow through plugins. You can use string_args though.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PythonVirtualenvOperator`
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
:type python_callable: function
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
:param requirements: A list of requirements as specified in a pip install command
:type requirements: list[str]
:param python_version: The Python version to run the virtualenv with. Note that
both 2 and 2.7 are acceptable forms.
:type python_version: Optional[Union[str, int, float]]
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:type use_dill: bool
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
See virtualenv documentation for more information.
:type system_site_packages: bool
:param op_args: A list of positional arguments to pass to python_callable.
:type op_args: list
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:type op_kwargs: dict
:param string_args: Strings that are present in the global var virtualenv_string_args,
available to python_callable at runtime as a list[str]. Note that args are split
by newline.
:type string_args: list[str]
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied
:type templates_dict: dict of str
:param templates_exts: a list of file extensions to resolve while
processing templated fields, for examples ``['.sql', '.hql']``
:type templates_exts: list[str]
"""
return _virtualenv_task(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
requirements=requirements,
python_version=python_version,
use_dill=use_dill,
system_site_packages=system_site_packages,
string_args=string_args,
templates_dict=templates_dict,
templates_exts=templates_exts,
**kwargs,
)
class _DockerTask(_TaskDecorator, DockerDecoratorMixin):
pass

_TaskDecorator = _DockerTask
except ImportError:
pass
# [END mixin_for_autocomplete]

task = _TaskDecorator()
3 changes: 1 addition & 2 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ def __init__(

def execute(self, context: Dict):
return_value = super().execute(context)
self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push)
return return_value
return self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push)

def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable):
"""
Expand Down
40 changes: 39 additions & 1 deletion airflow/decorators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,49 @@ def __init__(
T = TypeVar("T", bound=Callable)


class PythonDecoratorMixin:
"""
Helper class for inheritance. This class is only used for the __init__.pyi so that IDEs
will autocomplete docker decorator functions
:meta private:
"""

def __call__(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
return self.python(python_callable, multiple_outputs, **kwargs)

def python(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonDecoratedOperator,
**kwargs,
)


def python_task(
python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
Expand Down
45 changes: 27 additions & 18 deletions airflow/decorators/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,33 @@ def get_python_source(self):
T = TypeVar("T", bound=Callable)


def _virtualenv_task(
python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
class PythonVirtualenvDecoratorMixin:
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
Helper class for inheritance. This class is only used for the __init__.pyi so that IDEs
will autocomplete docker decorator functions
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
:meta private:
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonVirtualenvDecoratedOperator,
**kwargs,
)

def virtualenv(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Wraps a python function into an Airflow operator to run via a Python virtual environment.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonVirtualenvDecoratedOperator,
**kwargs,
)
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def extract():
# [END extract_virtualenv]

# [START transform_docker]
@task.virtualenv(multiple_outputs=True)
@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
Expand Down
12 changes: 12 additions & 0 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@
"type": "object",
"description": "Additional extras that the provider should have"
},
"task-decorators": {
"type": "array",
"description": "Decorators to use with the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
Expand Down
12 changes: 12 additions & 0 deletions airflow/provider_info.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@
"items": {
"type": "string"
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
}
},
"required": [
Expand Down
17 changes: 17 additions & 0 deletions airflow/providers/docker/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading