Skip to content

Commit

Permalink
forgotten in previous commit
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurprevot committed Nov 23, 2024
1 parent 8769bc7 commit 86e1cf9
Showing 1 changed file with 135 additions and 0 deletions.
135 changes: 135 additions & 0 deletions yaetos/deploy_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,138 @@ def choose_cluster(self, clusters, cluster_id=None):
answer = input('Your choice ? ')
return {'id': clusters[int(answer) - 1][1],
'name': clusters[int(answer) - 1][2]}

def start_spark_cluster(self, c, emr_version):
"""
:param c: EMR client
:return:
"""
instance_groups = [{
'Name': 'EmrMaster',
'InstanceRole': 'MASTER',
'InstanceType': self.ec2_instance_master,
'InstanceCount': 1,
}]
if self.emr_core_instances != 0:
instance_groups += [{
'Name': 'EmrCore',
'InstanceRole': 'CORE',
'InstanceType': self.ec2_instance_slaves,
'InstanceCount': self.emr_core_instances,
}]

response = c.run_job_flow(
Name=self.pipeline_name,
LogUri="s3://{}/{}/manual_run_logs/".format(self.s3_bucket_logs, self.metadata_folder),
ReleaseLabel=emr_version,
Instances={
'InstanceGroups': instance_groups,
'Ec2KeyName': self.ec2_key_name,
'KeepJobFlowAliveWhenNoSteps': self.deploy_args.get('leave_on', False),
'Ec2SubnetId': self.ec2_subnet_id,
# 'AdditionalMasterSecurityGroups': self.extra_security_gp, # TODO : make optional in future. "[self.extra_security_gp] if self.extra_security_gp else []" doesn't work.
},
Applications=self.emr_applications, # should be at a minimum [{'Name': 'Hadoop'}, {'Name': 'Spark'}],
Configurations=[
{ # Section to force python3 since emr-5.x uses python2 by default.
"Classification": "spark-env",
"Configurations": [{
"Classification": "export",
"Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"}
}]
},
# { # Section to add jars (redshift...), not used for now, since passed in spark-submit args.
# "Classification": "spark-defaults",
# "Properties": { "spark.jars": ["/home/hadoop/redshift_tbd.jar"], "spark.driver.memory": "40G", "maximizeResourceAllocation": "true"},
# }
],
JobFlowRole=self.emr_ec2_role,
ServiceRole=self.emr_role,
VisibleToAllUsers=True,
BootstrapActions=[{
'Name': 'setup_nodes',
'ScriptBootstrapAction': {
'Path': 's3n://{}/setup_nodes.sh'.format(self.package_path_with_bucket),
'Args': []
}
}],
)
# Process response to determine if Spark cluster was started, and if so, the JobFlowId of the cluster
response_code = response['ResponseMetadata']['HTTPStatusCode']
if response_code == 200:
self.cluster_id = response['JobFlowId']
else:
terminate("Could not create EMR cluster (status code {})".format(response_code))

logger.info("Created Spark EMR cluster ({}) with cluster_id {}".format(emr_version, self.cluster_id))

def describe_status_until_terminated(self, c):
"""
:param c:
:return:
"""
logger.info('Waiting for job to finish on cluster')
stop = False
while stop is False:
description = c.describe_cluster(ClusterId=self.cluster_id)
state = description['Cluster']['Status']['State']
if state == 'TERMINATED' or state == 'TERMINATED_WITH_ERRORS':
stop = True
logger.info('Job is finished')
logger.info('Cluster state:' + state)
time.sleep(30) # Prevent ThrottlingException by limiting number of requests

def describe_status(self, c):
description = c.describe_cluster(ClusterId=self.cluster_id)
logger.info(f'Cluster description: {description}')

def step_run_setup_scripts(self, c):
"""
:param c:
:return:
"""
response = c.add_job_flow_steps(
JobFlowId=self.cluster_id,
Steps=[{
'Name': 'Run Setup',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': f's3://{self.s3_region}.elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': [
"s3://{}/setup_master.sh".format(self.package_path_with_bucket),
"s3://{}".format(self.package_path_with_bucket),
]
}
}]
)
response_code = response['ResponseMetadata']['HTTPStatusCode']
if response_code == 200:
logger.debug(f"Added step 'run setup', using s3://{self.package_path_with_bucket}/setup_master.sh")
else:
raise Exception("Step couldn't be added")
time.sleep(1) # Prevent ThrottlingException

def step_spark_submit(self, c, app_file, app_args):
"""
:param c:
:return:
"""
cmd_runner_args = self.get_spark_submit_args(app_file, app_args)

response = c.add_job_flow_steps(
JobFlowId=self.cluster_id,
Steps=[{
'Name': 'Spark Application',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': cmd_runner_args
}
}]
)
response_code = response['ResponseMetadata']['HTTPStatusCode']
if response_code == 200:
logger.info("Added step 'spark-submit' with command line '{}'".format(' '.join(cmd_runner_args)))
else:
raise Exception("Step couldn't be added")
time.sleep(1) # Prevent ThrottlingException

0 comments on commit 86e1cf9

Please sign in to comment.