Skip to content

Commit 63aa3db

Browse files
authored
[AIRFLOW-6258] Add CloudFormation operators to AWS providers (apache#6824)
1 parent 13d419a commit 63aa3db

File tree

7 files changed

+629
-0
lines changed

7 files changed

+629
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
"""
21+
This module contains AWS CloudFormation Hook
22+
"""
23+
from botocore.exceptions import ClientError
24+
25+
from airflow.contrib.hooks.aws_hook import AwsHook
26+
27+
28+
class AWSCloudFormationHook(AwsHook):
29+
"""
30+
Interact with AWS CloudFormation.
31+
"""
32+
33+
def __init__(self, region_name=None, *args, **kwargs):
34+
self.region_name = region_name
35+
self.conn = None
36+
super().__init__(*args, **kwargs)
37+
38+
def get_conn(self):
39+
if not self.conn:
40+
self.conn = self.get_client_type('cloudformation', self.region_name)
41+
return self.conn
42+
43+
def get_stack_status(self, stack_name):
44+
"""
45+
Get stack status from CloudFormation.
46+
"""
47+
cloudformation = self.get_conn()
48+
49+
self.log.info('Poking for stack %s', stack_name)
50+
51+
try:
52+
stacks = cloudformation.describe_stacks(StackName=stack_name)['Stacks']
53+
return stacks[0]['StackStatus']
54+
except ClientError as e:
55+
if 'does not exist' in str(e):
56+
return None
57+
else:
58+
raise e
59+
60+
def create_stack(self, stack_name, params):
61+
"""
62+
Create stack in CloudFormation.
63+
64+
:param stack_name: stack_name.
65+
:type stack_name: str
66+
:param params: parameters to be passed to CloudFormation.
67+
:type params: dict
68+
"""
69+
70+
if 'StackName' not in params:
71+
params['StackName'] = stack_name
72+
self.get_conn().create_stack(**params)
73+
74+
def delete_stack(self, stack_name, params=None):
75+
"""
76+
Delete stack in CloudFormation.
77+
78+
:param stack_name: stack_name.
79+
:type stack_name: str
80+
:param params: parameters to be passed to CloudFormation (optional).
81+
:type params: dict
82+
"""
83+
84+
params = params or {}
85+
if 'StackName' not in params:
86+
params['StackName'] = stack_name
87+
self.get_conn().delete_stack(**params)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
"""
20+
This module contains CloudFormation create/delete stack operators.
21+
"""
22+
from typing import List
23+
24+
from airflow.models import BaseOperator
25+
from airflow.providers.amazon.aws.hooks.cloud_formation import AWSCloudFormationHook
26+
from airflow.utils.decorators import apply_defaults
27+
28+
29+
class CloudFormationCreateStackOperator(BaseOperator):
30+
"""
31+
An operator that creates a CloudFormation stack.
32+
33+
:param stack_name: stack name (templated)
34+
:type stack_name: str
35+
:param params: parameters to be passed to CloudFormation.
36+
37+
.. seealso::
38+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.create_stack
39+
:type params: dict
40+
:param aws_conn_id: aws connection to uses
41+
:type aws_conn_id: str
42+
"""
43+
template_fields: List[str] = ['stack_name']
44+
template_ext = ()
45+
ui_color = '#6b9659'
46+
47+
@apply_defaults
48+
def __init__(
49+
self,
50+
stack_name,
51+
params,
52+
aws_conn_id='aws_default',
53+
*args, **kwargs):
54+
super().__init__(*args, **kwargs)
55+
self.stack_name = stack_name
56+
self.params = params
57+
self.aws_conn_id = aws_conn_id
58+
59+
def execute(self, context):
60+
self.log.info('Parameters: %s', self.params)
61+
62+
cloudformation_hook = AWSCloudFormationHook(aws_conn_id=self.aws_conn_id)
63+
cloudformation_hook.create_stack(self.stack_name, self.params)
64+
65+
66+
class CloudFormationDeleteStackOperator(BaseOperator):
67+
"""
68+
An operator that deletes a CloudFormation stack.
69+
70+
:param stack_name: stack name (templated)
71+
:type stack_name: str
72+
:param params: parameters to be passed to CloudFormation.
73+
74+
.. seealso::
75+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.delete_stack
76+
:type params: dict
77+
:param aws_conn_id: aws connection to uses
78+
:type aws_conn_id: str
79+
"""
80+
template_fields: List[str] = ['stack_name']
81+
template_ext = ()
82+
ui_color = '#1d472b'
83+
ui_fgcolor = '#FFF'
84+
85+
@apply_defaults
86+
def __init__(
87+
self,
88+
stack_name,
89+
params=None,
90+
aws_conn_id='aws_default',
91+
*args, **kwargs):
92+
super().__init__(*args, **kwargs)
93+
self.params = params or {}
94+
self.stack_name = stack_name
95+
self.params = params
96+
self.aws_conn_id = aws_conn_id
97+
98+
def execute(self, context):
99+
self.log.info('Parameters: %s', self.params)
100+
101+
cloudformation_hook = AWSCloudFormationHook(aws_conn_id=self.aws_conn_id)
102+
cloudformation_hook.delete_stack(self.stack_name, self.params)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
"""
20+
This module contains sensors for AWS CloudFormation.
21+
"""
22+
from airflow.providers.amazon.aws.hooks.cloud_formation import AWSCloudFormationHook
23+
from airflow.sensors.base_sensor_operator import BaseSensorOperator
24+
from airflow.utils.decorators import apply_defaults
25+
26+
27+
class CloudFormationCreateStackSensor(BaseSensorOperator):
28+
"""
29+
Waits for a stack to be created successfully on AWS CloudFormation.
30+
31+
:param stack_name: The name of the stack to wait for (templated)
32+
:type stack_name: str
33+
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
34+
stored
35+
:type aws_conn_id: str
36+
:param poke_interval: Time in seconds that the job should wait between each try
37+
:type poke_interval: int
38+
"""
39+
40+
template_fields = ['stack_name']
41+
ui_color = '#C5CAE9'
42+
43+
@apply_defaults
44+
def __init__(self,
45+
stack_name,
46+
aws_conn_id='aws_default',
47+
region_name=None,
48+
*args,
49+
**kwargs):
50+
super().__init__(*args, **kwargs)
51+
self.stack_name = stack_name
52+
self.hook = AWSCloudFormationHook(aws_conn_id=aws_conn_id, region_name=region_name)
53+
54+
def poke(self, context):
55+
stack_status = self.hook.get_stack_status(self.stack_name)
56+
if stack_status == 'CREATE_COMPLETE':
57+
return True
58+
if stack_status in ('CREATE_IN_PROGRESS', None):
59+
return False
60+
raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')
61+
62+
63+
class CloudFormationDeleteStackSensor(BaseSensorOperator):
64+
"""
65+
Waits for a stack to be deleted successfully on AWS CloudFormation.
66+
67+
:param stack_name: The name of the stack to wait for (templated)
68+
:type stack_name: str
69+
:param aws_conn_id: ID of the Airflow connection where credentials and extra configuration are
70+
stored
71+
:type aws_conn_id: str
72+
:param poke_interval: Time in seconds that the job should wait between each try
73+
:type poke_interval: int
74+
"""
75+
76+
template_fields = ['stack_name']
77+
ui_color = '#C5CAE9'
78+
79+
@apply_defaults
80+
def __init__(self,
81+
stack_name,
82+
aws_conn_id='aws_default',
83+
region_name=None,
84+
*args,
85+
**kwargs):
86+
super().__init__(*args, **kwargs)
87+
self.stack_name = stack_name
88+
self.hook = AWSCloudFormationHook(aws_conn_id=aws_conn_id, region_name=region_name)
89+
90+
def poke(self, context):
91+
stack_status = self.hook.get_stack_status(self.stack_name)
92+
if stack_status in ('DELETE_COMPLETE', None):
93+
return True
94+
if stack_status == 'DELETE_IN_PROGRESS':
95+
return False
96+
raise ValueError(f'Stack {self.stack_name} in bad state: {stack_status}')

docs/operators-and-hooks-ref.rst

+6
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,12 @@ These integrations allow you to perform various operations within the Amazon Web
331331
- :mod:`airflow.providers.amazon.aws.operators.athena`
332332
- :mod:`airflow.providers.amazon.aws.sensors.athena`
333333

334+
* - `Amazon CloudFormation <https://aws.amazon.com/cloudformation/>`__
335+
-
336+
- :mod:`airflow.providers.amazon.aws.hooks.cloud_formation`
337+
- :mod:`airflow.providers.amazon.aws.operators.cloud_formation`
338+
- :mod:`airflow.providers.amazon.aws.sensors.cloud_formation`
339+
334340
* - `Amazon CloudWatch Logs <https://aws.amazon.com/cloudwatch/>`__
335341
-
336342
- :mod:`airflow.providers.amazon.aws.hooks.logs`

0 commit comments

Comments
 (0)