flybonzai
flybonzai

Reputation: 3931

How do you automate pyspark jobs on emr using boto3 (or otherwise)?

I am creating a job to parse massive amounts of server data, and then upload it into a Redshift database.

My job flow is as follows:

I'm getting hung up on how to automate this though so that my process spins up an EMR cluster, bootstraps the correct programs for installation, and runs my python script that will contain the code for parsing and writing.

Does anyone have any examples, tutorials, or experience they could share with me to help me learn how to do this?

Upvotes: 17

Views: 20923

Answers (4)

Laren Crawford
Laren Crawford

Reputation: 669

I put a complete example on GitHub that shows how to do all of this with Boto3.

The long-lived cluster example shows how to create and run job steps on a cluster that grabs data from a public S3 bucket that contains historical Amazon review data, do some PySpark processing on it, and write the output back to an S3 bucket.

  • Creates an Amazon S3 bucket and uploads a job script.
  • Creates AWS Identity and Access Management (IAM) roles used by the demo.
  • Creates Amazon Elastic Compute Cloud (Amazon EC2) security groups used by the demo.
  • Creates short-lived and long-lived clusters and runs job steps on them.
  • Terminates clusters and cleans up all resources.

Upvotes: 0

CpILL
CpILL

Reputation: 6989

Actually, I've gone with AWS's Step Functions, which is a state machine wrapper for Lambda functions, so you can use boto3 to start the EMR Spark job using run_job_flow and you can use describe_cluaster to get the status of the cluster. Finally use a choice. SO your step functions look something like this (step function types in brackets:

Run job (task) -> Wait for X min (wait) -> Check status (task) -> Branch (choice) [ => back to wait, or => done ]

Upvotes: 1

Kyle Bridenstine
Kyle Bridenstine

Reputation: 6373

Just do this using AWS Data Pipeline. You can setup your S3 bucket to trigger a lambda function every time a new file is placed inside the bucket https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html. Then your Lambda function will activate your Data Pipeline https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/ then your Data Pipeline spins up a new EMR Cluster using EmrCluster, then you can specify your bootstrap options, then you can run your EMR commands using EmrActivity, and when it's all done it'll terminate your EMR Cluster and deactivate the Data Pipeline.

Upvotes: 2

Kamil Sindi
Kamil Sindi

Reputation: 22822

Take a look at boto3 EMR docs to create the cluster. You essentially have to call run_job_flow and create steps that runs the program you want.

import boto3    

client = boto3.client('emr', region_name='us-east-1')

S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)

# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)

response = client.run_job_flow(
    Name="My Spark Cluster",
    ReleaseLabel='emr-4.6.0',
    Instances={
        'MasterInstanceType': 'm4.xlarge',
        'SlaveInstanceType': 'm4.xlarge',
        'InstanceCount': 4,
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    Applications=[
        {
            'Name': 'Spark'
        }
    ],
    BootstrapActions=[
        {
            'Name': 'Maximize Spark Default Config',
            'ScriptBootstrapAction': {
                'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
            }
        },
    ],
    Steps=[
    {
        'Name': 'Setup Debugging',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['state-pusher-script']
        }
    },
    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
        }
    },
    {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', '/home/hadoop/main.py']
        }
    }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole'
)

You can also add steps to a running cluster if you know the job flow id:

job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)

step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)

step_ids = step_response['StepIds']

print("Step IDs:", step_ids)

For more configurations, check out sparksteps.

Upvotes: 37

Related Questions