Reputation: 31
I'm trying to run a simple map reduce program where the mapper writes two different values for the same key, but they always end up to be the same when i get to the reducer.
here is my code:
public class kaka {
public static class Mapper4 extends Mapper<Text, Text, Text, Text>{
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(new Text("a"),new Text("b"));
context.write(new Text("a"),new Text("c"));
}
}
public static class Reducer4 extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Vector<Text> vals = new Vector<Text>();
for (Text val : values){
vals.add(val);
}
return;
}
}
public static void main(String[] args) throws Exception {
//deleteDir(new File("eran"));//todo
Configuration conf = new Configuration();
conf.set("mapred.map.tasks","10"); // asking for more mappers (it's a recommendation)
conf.set("mapred.max.split.size","1000000"); // set default size of input split. 1000 means 1000 bytes.
Job job1 = new Job(conf, "find most similar words");
job1.setJarByClass(kaka.class);
job1.setInputFormatClass(SequenceFileInputFormat.class);
job1.setMapperClass(Mapper4.class);
job1.setReducerClass(Reducer4.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path("vectors/part-r-00000"));
FileOutputFormat.setOutputPath(job1, new Path("result"));
job1.waitForCompletion(true);
System.exit(job1.waitForCompletion(true) ? 0 : 1);
}
}
Upvotes: 3
Views: 1575
Reputation: 30089
You're being caught out by objext re-use while iterating the values in the reducer. There was a JIRA patch a long time ago to improve efficiency, and this meant that the Key / Value objects passed to your mapper and the Key / Value objects passed to you reducer are always the same underlying object references, just the contents of those object change with each iteration.
Amend your code to made a copy of the value before adding to the vector:
public static class Reducer4 extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Vector<Text> vals = new Vector<Text>();
for (Text val : values){
// make copy of val before adding to the Vector
vals.add(new Text(val));
}
return;
}
}
Upvotes: 7