reo katoa
reo katoa

Reputation: 5801

Submitting multiple hadoop jobs through Java

I need to submit several jobs to Hadoop which are all related (which is why they are launched by the same driver class) but completely independent of each other. Right now I start jobs like this:

int res = ToolRunner.run(new Configuration(), new MapReduceClass(params), args);

which runs a job, gets the return code, and moves on.

What I'd like to do is submit several such jobs to run in parallel, retrieving the return code of each one.

The obvious (to me) idea would be to launch several threads, each of which is responsible for a single hadoop job, but I'm wondering if hadoop has a better way to accomplish this? I don't have any experience writing code with concurrency, so I'd rather not spend a lot of time learning the intricacies of it unless it's necessary here.

Upvotes: 1

Views: 1198

Answers (2)

reo katoa
reo katoa

Reputation: 5801

psabbate's answer led me to find a couple of pieces of the API that I was missing. This is how I solved it:

In the driver class, start the jobs with code like this:

List<RunningJob> runningJobs = new ArrayList<RunningJob>();
for (String jobSpec: jobSpecs) {
    // Configure, for example, a params map that gets passed into the MR class's constructor
    ToolRunner.run(new Configuration(), new MapReduceClass(params, runningJobs), null);
}

for (RunningJob rj: runningJobs) {
    System.err.println("Waiting on job "+rj.getID());
    rj.waitForCompletion();
}

Then, in the MapReduceClass, define a private variable List<RunningJob> runningJobs, define a constructor like this:

public MergeAndScore(Map<String, String> p, List<RunningJob> rj) throws IOException {
    params = Collections.unmodifiableMap(p);
    runningJobs = rj;
}

And in the run() method that ToolRunner calls, define your JobConf and submit the job with

JobClient jc = new JobClient();
jc.init(conf);
jc.setConf(conf);
runningJobs.add(jc.submitJob(conf));

With this, run() returns immediately, and the jobs can be accessed via the runningJobs object in the driver class.

Note that I am working on an older version of Hadoop, so jc.init(conf) and/or jc.setConf(conf) may or may not be necessary depending on your setup, though probably at least one of them is required.

Upvotes: 1

psabbate
psabbate

Reputation: 777

This could be a suggestion, but implies code, so I will put it as an answer.

In this code (personal code), I just iterate through some variable, and submit a job (the same job) several times.

Using job.waitForCompletion(false) will help you to submit several jobs.

while (processedInputPaths < inputPaths.length) {

    if (processedInputPaths + inputPathsLimit < inputPaths.length) {
        end = processedInputPaths + inputPathsLimit - 1;
    } else {
        end = inputPaths.length - 1;
    }
    start = processedInputPaths;

    Job job = this.createJob(configuration, inputPaths, cycle, start, end, outputPath + "/" + cycle);

    boolean success = job.waitForCompletion(true);

    if (success) {
        cycle++;
        processedInputPaths = end + 1;
    } else {
        LOG.info("Cycle did not end successfully :" + cycle);
        return -1;
    }

}

Upvotes: 3

Related Questions