duffn
duffn

Reputation: 3760

boto EMR add step and auto terminate

Python 2.7.12

boto3==1.3.1

How can I add a step to a running EMR cluster and have the cluster terminated after the step is complete, regardless of it fails or succeeds?

Create the cluster

response = client.run_job_flow(
    Name=name,
    LogUri='s3://mybucket/emr/',
    ReleaseLabel='emr-5.9.0',
    Instances={
        'MasterInstanceType': instance_type,
        'SlaveInstanceType': instance_type,
        'InstanceCount': instance_count,
        'KeepJobFlowAliveWhenNoSteps': True,
        'Ec2KeyName': 'KeyPair',
        'EmrManagedSlaveSecurityGroup': 'sg-1234',
        'EmrManagedMasterSecurityGroup': 'sg-1234',
        'Ec2SubnetId': 'subnet-1q234',
    },
    Applications=[
        {'Name': 'Spark'},
        {'Name': 'Hadoop'}
    ],
    BootstrapActions=[
        {
            'Name': 'Install Python packages',
            'ScriptBootstrapAction': {
                'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
            }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    Configurations=[
        {
            'Classification': 'spark',
            'Properties': {
                'maximizeResourceAllocation': 'true'
            }
        },
    ],
)

Add a step

response = client.add_job_flow_steps(
    JobFlowId=cluster_id,
    Steps=[
        {
            'Name': 'Run Step',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Args': [
                    'spark-submit',
                    '--deploy-mode', 'cluster',
                    '--py-files',
                    's3://mybucket/code/spark/spark_udfs.py',
                    's3://mybucket/code/spark/{}'.format(spark_script),
                    '--some-arg'
                ],
                'Jar': 'command-runner.jar'
            }
        }
    ]
)

This successfully adds a step and runs, however, when the step completes successfully, I would like the cluster to auto-terminate as noted in the AWS CLI: http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html

Upvotes: 12

Views: 12121

Answers (3)

Laren Crawford
Laren Crawford

Reputation: 669

You can create a short-lived cluster that automatically terminates after all steps have been run by specifying 'KeepJobFlowAliveWhenNoSteps': False in the Instances param. I've added a complete example to GitHub that shows how to do this.

Here's some of the code from the demo:

def run_job_flow(
        name, log_uri, keep_alive, applications, job_flow_role, service_role,
        security_groups, steps, emr_client):
    try:
        response = emr_client.run_job_flow(
            Name=name,
            LogUri=log_uri,
            ReleaseLabel='emr-5.30.1',
            Instances={
                'MasterInstanceType': 'm5.xlarge',
                'SlaveInstanceType': 'm5.xlarge',
                'InstanceCount': 3,
                'KeepJobFlowAliveWhenNoSteps': keep_alive,
                'EmrManagedMasterSecurityGroup': security_groups['manager'].id,
                'EmrManagedSlaveSecurityGroup': security_groups['worker'].id,
            },
            Steps=[{
                'Name': step['name'],
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': ['spark-submit', '--deploy-mode', 'cluster',
                             step['script_uri'], *step['script_args']]
                }
            } for step in steps],
            Applications=[{
                'Name': app
            } for app in applications],
            JobFlowRole=job_flow_role.name,
            ServiceRole=service_role.name,
            EbsRootVolumeSize=10,
            VisibleToAllUsers=True
        )
        cluster_id = response['JobFlowId']
        logger.info("Created cluster %s.", cluster_id)
    except ClientError:
        logger.exception("Couldn't create cluster.")
        raise
    else:
        return cluster_id

And here's some code that calls this function with some real params:

output_prefix = 'pi-calc-output'
pi_step = {
    'name': 'estimate-pi-step',
    'script_uri': f's3://{bucket_name}/{script_key}',
    'script_args':
        ['--partitions', '3', '--output_uri',
        f's3://{bucket_name}/{output_prefix}']
}
cluster_id = emr_basics.run_job_flow(
    f'{prefix}-cluster', f's3://{bucket_name}/logs',
    False, ['Hadoop', 'Hive', 'Spark'], job_flow_role, service_role,
    security_groups, [pi_step], emr_client)

Upvotes: 0

Rene B.
Rene B.

Reputation: 7364

The 'AutoTerminate': True parameter as suggested did not work for me. However, it worked when I set the parameter 'KeepJobFlowAliveWhenNoSteps' from True to False. Your Code should look then as the following:

response = client.run_job_flow(
    Name=name,
    LogUri='s3://mybucket/emr/',
    ReleaseLabel='emr-5.9.0',
    Instances={
        'MasterInstanceType': instance_type,
        'SlaveInstanceType': instance_type,
        'InstanceCount': instance_count,
        'KeepJobFlowAliveWhenNoSteps': False,
        'Ec2KeyName': 'KeyPair',
        'EmrManagedSlaveSecurityGroup': 'sg-1234',
        'EmrManagedMasterSecurityGroup': 'sg-1234',
        'Ec2SubnetId': 'subnet-1q234',
    },
    Applications=[
        {'Name': 'Spark'},
        {'Name': 'Hadoop'}
    ],
    BootstrapActions=[
        {
            'Name': 'Install Python packages',
            'ScriptBootstrapAction': {
                'Path': 's3://mybucket/code/spark/bootstrap_spark_cluster.sh'
            }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole',
    Configurations=[
        {
            'Classification': 'spark',
            'Properties': {
                'maximizeResourceAllocation': 'true'
            }
        },
    ],
)

Upvotes: 0

nicor88
nicor88

Reputation: 1553

In your case (creating the cluster using boto3) you can add these flags 'TerminationProtected': False, 'AutoTerminate': True, to your cluster creation. In this way after your step finished to run the cluster will be shut-down.

Another solution is to add another step to kill the cluster immediately after the step that you want to run. So basically you need to run this command as step

aws emr terminate-clusters --cluster-ids your_cluster_id

The tricky part is to retrive the cluster_id. Here you can find some solution: Does an EMR master node know it's cluster id?

Upvotes: 8

Related Questions