Reputation: 400
I came across "chaining of mapreduce jobs." Being new to mapreduce, under what circumstances do we have to chain (I am assuming chaining means running mapreduce jobs one after the other sequentially) jobs?
And are there any examples that could help?
Upvotes: 3
Views: 14011
Reputation: 4067
Classical example of a job that has to be chained is a word count that outputs words sorted by their frequency.
You will need:
Job 1:
Job 2:
Here is the example of mappers/reducers above:
public class HadoopWordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, LongWritable> {
private final static Text word = new Text();
private final static LongWritable one = new LongWritable(1);
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class KeyValueSwappingMapper extends Mapper<Text, LongWritable, LongWritable, Text> {
public void map(Text key, LongWritable value, Context context) throws IOException, InterruptedException {
context.write(value, key);
}
}
public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable result = new LongWritable();
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Here is the example of the driver program.
It expects two arguments:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path out = new Path(args[1]);
Job job1 = Job.getInstance(conf, "word count");
job1.setJarByClass(HadoopWordCount.class);
job1.setMapperClass(TokenizerMapper.class);
job1.setCombinerClass(SumReducer.class);
job1.setReducerClass(SumReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(out, "out1"));
if (!job1.waitForCompletion(true)) {
System.exit(1);
}
Job job2 = Job.getInstance(conf, "sort by frequency");
job2.setJarByClass(HadoopWordCount.class);
job2.setMapperClass(KeyValueSwappingMapper.class);
job2.setNumReduceTasks(1);
job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
job2.setOutputKeyClass(LongWritable.class);
job2.setOutputValueClass(Text.class);
job2.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job2, new Path(out, "out1"));
FileOutputFormat.setOutputPath(job2, new Path(out, "out2"));
if (!job2.waitForCompletion(true)) {
System.exit(1);
}
}
Upvotes: 17
Reputation: 3973
Simply, you have to chain multiple map reduce jobs when your problem cannot fit in just one map reduce job.
A good example is to find a top 10 bought items, this can achieved by 2 jobs :
A map reduce job to find how many time each item is bought.
The second job, sort items based on number of times it was bought, and get top 10 items.
To get complete idea, job chaining generate intermidiate files that are written to, and read from disk, therefore it will decrease performance. Try to avoid chaining jobs as possible.
And here how to chain jobs.
Upvotes: 0