Reputation: 2182
all
I have simple map/reduce implementation. Mapper is called and it does its job but reducer is never called.
Here is mapper:
static public class InteractionMap extends Mapper<LongWritable, Text, Text, InteractionWritable> {
@Override
protected void map(LongWritable offset, Text text, Context context) throws IOException, InterruptedException {
System.out.println("mapper");
String[] tokens = text.toString().split(",");
for (int idx = 0; idx < tokens.length; idx++) {
String sourceUser = tokens[1];
String targetUser = tokens[2];
int points = Integer.parseInt(tokens[4]);
context.write(new Text(sourceUser), new InteractionWritable(targetUser, points));
}
}
}
}
Here is my reducer:
static public class InteractionReduce extends Reducer<Text, InteractionWritable, Text, Text> {
@Override
protected void reduce(Text token, Iterable<InteractionWritable> counts, Context context) throws IOException, InterruptedException {
System.out.println("REDUCER");
Iterator<InteractionWritable> i = counts.iterator();
while (i.hasNext()) {
InteractionWritable interaction = i.next();
context.write(token, new Text(token.toString() + " " + interaction.getTargetUser().toString() + " " + interaction.getPoints().get()));
}
}
}
And, here is configuration part:
@Override
public int run(String[] args) throws Exception {
Configuration configuration = getConf();
Job job = new Job(configuration, "Interaction Count");
job.setJarByClass(InteractionMapReduce.class);
job.setMapperClass(InteractionMap.class);
job.setCombinerClass(InteractionReduce.class);
job.setReducerClass(InteractionReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : -1;
}
Does anyone have any idea why reducer is not being invoked?
Upvotes: 7
Views: 6417
Reputation: 2182
Ok, it was my fault, as expected. Job configuration wasn't good. This is how it should look like:
Configuration configuration = getConf();
Job job = new Job(configuration, "Interaction Count");
job.setJarByClass(InteractionMapReduce.class);
job.setMapperClass(InteractionMap.class);
job.setReducerClass(InteractionReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InteractionWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : -1;
The problem occurred because map and reduce phases have different output types. Job failed silently after invoking context.write method. So, what I had to add are these lines:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InteractionWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Upvotes: 8
Reputation: 7871
text
in your Mapper
method has some data.Reducer
to be the Combiner
as well as the Reducer
?I always have one main class InteractionMapReduce
and inside it I have the InteractionMap
and the InteractionReduce
class.
So while setting the Mapper
and the Reducer
class in the job, I set them like InteractionMapReduce.InteractionMap.class
and InteractionMapReduce.InteractionReduce.class
.
I do not know whether this would help you but you could try it.
Upvotes: 1