Reputation: 958
I am trying to learn MapReduce and quite new to it. I studied the optimization that Combiner provides by reducing the mapper output at the data node level itself.
Now, it is understandable that the mapper output key/val and combiner input key/value needs to be the same. But I can't digest the fact that combiner output key/value and mapper output key/Val needs to be the same.
If I want to find the average of data which is in the form Name, Price then I will probably choose below:
Mapper<LongWritable, Text, Text, IntWritable>
Combiner<Text, IntWritable, Text, FloatWritable>
Reducer<Text, IntWritable, Text, FloatWritable>
By doing this I am getting errors and when I read online I found the output of Mapper and Combiner needs to be the same but couldn't find a reason for it.
Below is my sample data:
Schema - cid,cname,email,date,pid,pname,price
101,jai,[email protected],1-aug-2016,1,iphone,65000
101,jai,[email protected],1-aug-2016,2,ipad,35000
101,jai,[email protected],1-aug-2016,3,Samsung S5,34000
Below is my code:
import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
public class q1 {
//cid,cname,email,date,pid,pname,price
public static class avg_mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String[] line = value.toString().split(",");
Text cname = new Text(line[1]);
IntWritable price = new IntWritable(Integer.parseInt(line[6]));
context.write(cname, price);
}
}
public static class avg_reducer extends Reducer<Text, IntWritable, Text, FloatWritable>{
public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException{
int sum = 0;
int count=0;
for (IntWritable val : value){
count+=1;
sum+=val.get();
}
Float avg = (float)sum/count;
context.write(key,new FloatWritable(avg));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "Average");
job.setJarByClass(q1.class);
job.setMapperClass(avg_mapper.class);
job.setReducerClass(avg_reducer.class);
job.setCombinerClass(avg_reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
Below is the error I am getting:
Error: java.io.IOException: wrong value class: class org.apache.hadoop.io.FloatWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1374)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1691)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at q1$avg_reducer.reduce(q1.java:34)
at q1$avg_reducer.reduce(q1.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1712)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1641)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1492)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:729)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
I am trying to find the average price for each cname. Any help is appreciated.
Upvotes: 0
Views: 990
Reputation: 2252
But I can't digest the fact that combiner output key/value and mapper output key/Val needs to be the same.
It is very simple the input type of reducer is not changed so in your case it is always (Text, IntWritable)
. reducer does not care how this input is provided to it. It always wants the input to be of type (Text, IntWritable)
so both the output of mapper and combiner should be the same and should be (Text, IntWritable)
.
But The first thing that you should know is that you should never put some logic of your application in combiner in mapreduce.There is no grantee that the combiner will be run when Hadoop is running your job. And also Hadoop may run the combiner more than once when executing job.
So what's the purpose of combiner?
The only goal of combiner is to reduce the amount of data being sent from machine that are executing mapper tasks to machines that will run reducer tasks. If you want to write a combiner you should design it in a way that the number of time this combiner will be executed in mapreeduce does not affect the output of your application.
Now for a moment think that you have changed the output type of your map so that it can be it can be run without error. Is there any other problem with your application? Definitely Yes.
Imagine you have this input:
101,jai,[email protected],1-aug-2016,1,iphone,65000
101,jai,[email protected],1-aug-2016,2,ipad,35000
101,jai,[email protected],1-aug-2016,3,Samsung S5,34000
So map output will be like this:
jai -> 65000
jai -> 35000
jai -> 34000
Now imagine reducer input in two different scenario:
First scenario combiner is not executed at all:
jai -> 65000
jai -> 35000
jai -> 34000
in this case reducer output will be :
jai -> 44666.666666666664
Second scenario combiner is executed on two first elements in mapper output:
jai -> 50000 // combiner executed on the first two item above and produce jai -> (65000 + 35000) / 2
jai -> 34000 // the third is sent to the reducer without combiner executed on it
in this case the output of reducer will be:
jai -> 67000 // (50000 + 34000) / 2
It is clear that the result of your application will depend on the number of times combiner will be executed.
One workaround would be to assign weigh to the value that is being sent to reducer and combiner for example for the same input above the output of reducer will be like this:
jai -> 1-65000 // this shows both weigh and value separated by dash(-)
jai -> 1-35000
jai -> 1-34000
Now imagine first scenario that combiner is not executed at all:
In this case reducer input will be the output of mapper above so the output of reducer will be:
jai -> 3-44666.666666666664
And the second scenario when combiner is executed on two first element so combiner output will be like this:
jai -> 2-50000 // this is jai -> 2 - (65000 + 35000) / 2
jai -> 1-34000
So the reducer output will be:
jai -> 3-44666.666666666664 // 3 - (2 * 50000) + (1 * 34000) / 3
This way no matter how many times your combiner will run the output of your application will always be the same.
Implementation:
There are many approaches to implement this solution in mapreduce. You can either define your own Writable type to hold both weigh and average or use Simple Text to and separate them with dash character(-). I went for the second one for simplicity.
Here is the mapper implementation:
public class AverageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(",");
Text cname = new Text(line[1]);
context.write(cname, new Text(1 + "-" + String.valueOf(line[6])));
}
}
And here is the reducer implementation:
public class AverageReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
double sum = 0D;
long count = 0L;
long elementCount;
for(Text value : values) {
String str = new String(value.copyBytes());
String[] result = str.split("-");
elementCount = Long.valueOf(result[0]);
count += elementCount;
sum += elementCount * Double.valueOf(result[1]);
}
context.write(key, new Text(String.valueOf(count + "-" + (sum / count))));
}
}
Note that sometimes there will be small difference between result (because of floating point rounding problem) when combiner will be executed different times but that is acceptable and that will not be a significant difference.
Upvotes: 1