Ankur Shrivastava
Ankur Shrivastava

Reputation: 243

Call multiple spark jobs within single EMR cluster

I want to call multiple spark jobs using spark-submit within single EMR cluster. Does EMR supports this? How to achieve this? I use AWS Lambda to invoke EMR job for my spark job at this point of time but we would like to extend to multiple spark jobs within single EMR cluster.

Upvotes: 1

Views: 2184

Answers (1)

Assafs
Assafs

Reputation: 3275

You can run multiple spark jobs on one EMR sequentially - that is, the next job will be launched after the previous job completes. This is done using EMR steps.

I used the Java SDK to run this, but you can see in this documentation how to add step using CLI only.

My code below uses spark-submit, but it's not run directly as you would run it in the CLI. Instead I ran it as a shell script, and included an environment variable for HADOOP_USER_NAME so the spark job is run under the username I specify. You can skip it if you want to run the job under the username you logged into your EMR (hadoop, by default).

In the code excerpt below the object emr is of type AmazonElasticMapReduce, provided in the sdk. If you're using the CLI approach you will not need it.

Some assisting methods like uploadConfFile are self-explanatory. I used an extensive configuration for the spark application, and unlike the files and jars which can be local or in s3/hdfs, the configuration file must be in a local file on the EMR itself.

When you finish, you will have created a step on your EMR cluster that will launch a new spark application. You can specify many steps on your EMR which will run one after the other.

//Upload the spark configuration you wish to use to a local file    
uploadConfFile(clusterId, sparkConf, confFileName);

//create a list of arguments - which is the complete command for spark-submit
List<String> stepargs = new ArrayList<String>();
//start with an envelope to specify the hadoop user name
stepargs.add("/bin/sh");
stepargs.add("-c");
//call to spark-submit with the incantation stating its arguments are provided next.
stepargs.add("HADOOP_USER_NAME="+task.getUserName()+" spark-submit \"$@\"");
stepargs.add("sh");
//add the spark-submit arguments
stepargs.add("--class");
stepargs.add(mainClass);
stepargs.add("--deploy-mode");
stepargs.add("cluster");
stepargs.add("--master");
stepargs.add("yarn");
stepargs.add("--files");
//a comma-separated list of file paths in s3
stepargs.add(files);
stepargs.add("--jars");
//a comma-separated list of file paths in s3
stepargs.add(jars);
stepargs.add("--properties-file");
//the file we uploaded to the EMR, with its full path
stepargs.add(confFileName);
stepargs.add(jar);
//add the jar specific arguments in here

AddJobFlowStepsResult result = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
    .withJobFlowId(clusterId)
    .withSteps(new StepConfig()
            .withName(name)
            .withActionOnFailure(ActionOnFailure.CONTINUE)
            .withHadoopJarStep(new HadoopJarStepConfig()
                    .withJar("command-runner.jar")
                    .withArgs(stepargs))));

Upvotes: 0

Related Questions