Reputation: 3983
I have set up a Hadoop job like so:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Legion");
job.setJarByClass(Legion.class);
job.setMapperClass(CallQualityMap.class);
job.setReducerClass(CallQualityReduce.class);
// Explicitly configure map and reduce outputs, since they're different classes
job.setMapOutputKeyClass(CallSampleKey.class);
job.setMapOutputValueClass(CallSample.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(CombineRepublicInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
CombineRepublicInputFormat.setMaxInputSplitSize(job, 128000000);
CombineRepublicInputFormat.setInputDirRecursive(job, true);
CombineRepublicInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
This job completes, but something strange happens. I get one output line per input line. Each output line consists of the output from a CallSampleKey.toString()
method, then a tab, then something like CallSample@17ab34d
.
This means that the reduce phase is never running and the CallSampleKey
and CallSample
are getting passed directly to the TextOutputFormat
. But I don't understand why this would be the case. I've very clearly specified job.setReducerClass(CallQualityReduce.class);
, so I have no idea why it would skip the reducer!
Edit: Here's the code for the reducer:
public static class CallQualityReduce extends Reducer<CallSampleKey, CallSample, NullWritable, Text> {
public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues, Context context) throws IOException, InterruptedException {
Call call = new Call(inKey.getId().toString(), inKey.getUuid().toString());
while (inValues.hasNext()) {
call.addSample(inValues.next());
}
context.write(NullWritable.get(), new Text(call.getStats()));
}
}
Upvotes: 2
Views: 199
Reputation: 10707
What if you try to change your
public void reduce(CallSampleKey inKey, Iterator<CallSample> inValues, Context context) throws IOException, InterruptedException {
to use Iterable
instead of Iterator
?
public void reduce(CallSampleKey inKey, Iterable<CallSample> inValues, Context context) throws IOException, InterruptedException {
You'll have to then use inValues.iterator()
to get the actual iterator.
If the method signature doesn't match then it's just falling through to the default identity reducer implementation. It's perhaps unfortunate that the underlying default implementation doesn't make it easy to detect this kind of typo, but the next best thing is to always use @Override
in all methods you intend to override so that the compiler can help.
Upvotes: 3