ThrottlingException: when creating and checking status of EMR task from airflow #34459
Replies: 3 comments 2 replies
-
ThrottlingException from botocore is usually a good sign that something is done wrong. |
Beta Was this translation helpful? Give feedback.
-
Thanks for responding to the issue. I would like to provide more context of the problem but before that here is the full function that I am using. I am using the above function to create an EMR cluster and wait for the EMR cluster to get created. I don't find any other way (maybe I missed the latest updates, please let me know if there is another way). as part of the requirement, we have to create 30 to 40 EMR clusters in parallel from airflow and submit jobs to each cluster. We are using config driver EMR script in airflow which creates 30 clusters in parallel. Regarding your comment, I have a question. job_flow_id = create_emr.execute(context), how does this code execute continuously as this is just a one-time activity? unless I am using the sensor as above. I went through the EMR source code but couldn't find any. Thanks |
Beta Was this translation helpful? Give feedback.
-
Unable to reproduce this in Apache Airflow 2.2.2 and Amazon Provider 2.4.0 as well as main branch. Everything assigned well [2023-09-18, 17:40:21 UTC] {emr_create_job_flow.py:95} INFO - Creating JobFlow using aws-conn-id: aws_default, emr-conn-id: emr_default
[2023-09-18, 17:40:21 UTC] {base_aws.py:401} INFO - Airflow Connection: aws_conn_id=aws_default
>>>>>>> Config assignments
[2023-09-18, 17:40:21 UTC] {base_aws.py:413} INFO - Retrieving config_kwargs from Connection.extra_config['config_kwargs']: {'retries': {'mode': 'standard', 'max_attempts': 10}}
<<<<<<< Config assignments
[2023-09-18, 17:40:21 UTC] {base_aws.py:190} INFO - No credentials retrieved from Connection
[2023-09-18, 17:40:21 UTC] {base_aws.py:88} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2023-09-18, 17:40:21 UTC] {base_aws.py:93} INFO - Creating session with aws_access_key_id=None region_name=eu-west-1
[2023-09-18, 17:40:21 UTC] {base_aws.py:168} INFO - role_arn is None
[2023-09-18, 17:40:21 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py:494 DeprecationWarning: client_type is deprecated. Set client_type from class attribute.
[2023-09-18, 17:40:21 UTC] {credentials.py:1102} INFO - Found credentials in environment variables.
[2023-09-18, 17:40:22 UTC] {emr_create_job_flow.py:108} INFO - JobFlow with id j-3LQTDXI39XVVV created
[2023-09-18, 17:40:22 UTC] {taskinstance.py:1280} INFO - Marking task as SUCCESS. dag_id=example_emr, task_id=create_job_flow, execution_date=20230918T174017, start_date=20230918T174021, end_date=20230918T174022
[2023-09-18, 17:40:22 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2023-09-18, 17:40:22 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check You could check on this simple DAG from datetime import datetime
from airflow.decorators import task
from airflow import DAG
from airflow.providers.amazon.aws.hooks.emr import EmrHook
with DAG(
dag_id="check_client_config",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=["boto3", "config-retries"],
catchup=False,
) as dag:
@task
def boto3_check(aws_conn_id, emr_conn_id):
hook = EmrHook(emr_conn_id=emr_conn_id, aws_conn_id=aws_conn_id)
print(f"Boto3 Client Config Retries: {hook.get_conn().meta.config.retries}")
boto3_check("aws_default", "emr_default") [2023-09-18, 17:47:56 UTC] {base_aws.py:401} INFO - Airflow Connection: aws_conn_id=aws_default
[2023-09-18, 17:47:56 UTC] {base_aws.py:413} INFO - Retrieving config_kwargs from Connection.extra_config['config_kwargs']: {'retries': {'mode': 'standard', 'max_attempts': 10}}
[2023-09-18, 17:47:56 UTC] {base_aws.py:190} INFO - No credentials retrieved from Connection
[2023-09-18, 17:47:56 UTC] {base_aws.py:88} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2023-09-18, 17:47:56 UTC] {base_aws.py:93} INFO - Creating session with aws_access_key_id=None region_name=eu-west-1
[2023-09-18, 17:47:56 UTC] {base_aws.py:168} INFO - role_arn is None
[2023-09-18, 17:47:56 UTC] {logging_mixin.py:109} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py:494 DeprecationWarning: client_type is deprecated. Set client_type from class attribute.
[2023-09-18, 17:47:56 UTC] {credentials.py:1102} INFO - Found credentials in environment variables.
[2023-09-18, 17:47:56 UTC] {logging_mixin.py:109} INFO - Boto3 Client Config Retries: {'mode': 'standard', 'total_max_attempts': 11}
[2023-09-18, 17:47:56 UTC] {python.py:152} INFO - Done. Returned value was: None
[2023-09-18, 17:47:56 UTC] {taskinstance.py:1280} INFO - Marking task as SUCCESS. dag_id=check_client_config, task_id=boto3_check, execution_date=20230918T174754, start_date=20230918T174756, end_date=20230918T174756
[2023-09-18, 17:47:56 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2023-09-18, 17:47:56 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check I would recommend:
|
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Apache Airflow Version: 2.2.2
EMR script: https://github.com/apache/airflow/blob/2.2.2/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
Error:
create_emr_job_flow
job_flow_id = job_flow_creator.execute(context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/emr_create_job_flow.py", line 103, in execute
response = emr.create_job_flow(job_flow_overrides)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/emr.py", line 88, in create_job_flow
response = self.get_conn().run_job_flow(**config)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the RunJobFlow operation (reached max retries: 4): Rate exceeded
[2023-09-06, 18:11:51 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=DMS_source_daily, task_id=create_emr_cluster_sf_xgcb0, execution_date=20230906T181058, start_date=20230906T181137, end_date=20230906T181151
[2023-09-06, 18:11:51 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 118988 for task create_emr_cluster_sf_xgcb0
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task
error_file=args.error_file,
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
result = execute_callable(context=context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute
return_value = self.execute_callable()
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/DAG_DMS_source_daily.py", line 447, in create_emr_job_flow
job_flow_id = job_flow_creator.execute(context)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/emr_create_job_flow.py", line 103, in execute
response = emr.create_job_flow(job_flow_overrides)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/amazon/aws/hooks/emr.py", line 88, in create_job_flow
response = self.get_conn().run_job_flow(**config)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the RunJobFlow operation (reached max retries: 4): Rate exceeded
As suggested by Airflow document I have add extra field for AWS default connection but still no change.
{"config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/aws.html#set-by-environment-variables
What you think should happen instead
Airflow should be able to create EMR cluster and check the status of the EMR step with out any issues. The issue is happening randomly
How to reproduce
from airflow.providers.amazon.aws.operators.ecs import ECSOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_job_flow import EmrJobFlowSensor
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
create_emr= EmrCreateJobFlowOperator(
task_id='Create_EMR_Job_Flow',
job_flow_overrides="{" + cluster_config(**context) + "}"
)
job_flow_id = create_emr.execute(context)
add_steps_sensor = EmrStepSensor(
task_id="watch_spark_step",
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='emr_cluster_id') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_spark_step', key='spark_step_id') }}",
on_success_callback=notify_task_success_to_slack,
on_failure_callback=notify_task_failure_to_slack
)
Operating System
MWAA
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==2.4.0
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions