Reputation: 3931
I am creating a job to parse massive amounts of server data, and then upload it into a Redshift
database.
My job flow is as follows:
dataframes
or spark sql to parse the data and write back out to S3I'm getting hung up on how to automate this though so that my process spins up an EMR cluster, bootstraps the correct programs for installation, and runs my python script that will contain the code for parsing and writing.
Does anyone have any examples, tutorials, or experience they could share with me to help me learn how to do this?
Upvotes: 17
Views: 20923
Reputation: 669
I put a complete example on GitHub that shows how to do all of this with Boto3.
The long-lived cluster example shows how to create and run job steps on a cluster that grabs data from a public S3 bucket that contains historical Amazon review data, do some PySpark processing on it, and write the output back to an S3 bucket.
Upvotes: 0
Reputation: 6989
Actually,
I've gone with AWS's Step Functions, which is a state machine wrapper for Lambda functions, so you can use boto3
to start the EMR Spark job using run_job_flow and you can use describe_cluaster to get the status of the cluster. Finally use a choice. SO your step functions look something like this (step function types in brackets:
Run job (task) -> Wait for X min (wait) -> Check status (task) -> Branch (choice) [ => back to wait, or => done ]
Upvotes: 1
Reputation: 6373
Just do this using AWS Data Pipeline. You can setup your S3 bucket to trigger a lambda function every time a new file is placed inside the bucket https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html. Then your Lambda function will activate your Data Pipeline https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/ then your Data Pipeline spins up a new EMR Cluster using EmrCluster, then you can specify your bootstrap options, then you can run your EMR commands using EmrActivity, and when it's all done it'll terminate your EMR Cluster and deactivate the Data Pipeline.
Upvotes: 2
Reputation: 22822
Take a look at boto3 EMR docs to create the cluster. You essentially have to call run_job_flow and create steps that runs the program you want.
import boto3
client = boto3.client('emr', region_name='us-east-1')
S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)
# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)
response = client.run_job_flow(
Name="My Spark Cluster",
ReleaseLabel='emr-4.6.0',
Instances={
'MasterInstanceType': 'm4.xlarge',
'SlaveInstanceType': 'm4.xlarge',
'InstanceCount': 4,
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
},
Applications=[
{
'Name': 'Spark'
}
],
BootstrapActions=[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {
'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
}
},
],
Steps=[
{
'Name': 'Setup Debugging',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['state-pusher-script']
}
},
{
'Name': 'setup - copy files',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
}
},
{
'Name': 'Run Spark',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit', '/home/hadoop/main.py']
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole'
)
You can also add steps to a running cluster if you know the job flow id:
job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)
step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)
step_ids = step_response['StepIds']
print("Step IDs:", step_ids)
For more configurations, check out sparksteps.
Upvotes: 37