Abhi
Abhi

Reputation: 1255

reducer not being called in the mapreduce program

I am writign a simple extension on Mapreduce program and found that my code is only displaying output from Map(). Mapred job runs in eclipse without any errors but does not invoke reduce().

Here is my map():

public static class KVMapper 
  extends Mapper<Text, Text, IntWritable, Text>{
//        extends Mapper<Text, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private String word;// = new Text();
    private IntWritable iw;
    private final LongWritable val = new LongWritable();
    public void map(Text key, Text value , Context context
                    ) throws IOException, InterruptedException {

      iw = new IntWritable(Integer.parseInt(value.toString()));
      System.out.println(value +" hello , world  " +key );
      context.write(iw, key);
      }
    }

Reduce()

public static class KVReducer 
       extends Reducer<IntWritable,Text,IntWritable, Text> {

      KVReducer(){

          System.out.println("Inside reducer");
      }
        public void reduce(IntWritable key, Text value, 
                       Context context
                       ) throws IOException, InterruptedException {
      System.out.println(value +" hello2 , world  " +key );
      context.write(key, value);
    }
  }

main()

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
    //conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word desc");
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setJarByClass(WordDesc.class);
    job.setMapperClass(KVMapper.class);
    job.setCombinerClass(KVReducer.class);
    job.setReducerClass(KVReducer.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

Sample of the input:

1500s   1
1960s   1
Aldus   1

Sample output from the program, while I was expecting mapper to reverse key and value pairs

1500s   1
1960s   1
Aldus   1

Not sure why the reduce() is not being invoked in the above code

Upvotes: 0

Views: 317

Answers (1)

Mahendra
Mahendra

Reputation: 1426

You are not overriding reduce() method of Reducer class.

For your case its signature should be like public void reduce(IntWritable key, Iterable<Text> values,Context context)

Here is updated KVReducer

public static class KVReducer 
       extends Reducer<IntWritable,Text,IntWritable, Text> {

      KVReducer(){

          System.out.println("Inside reducer");
      }

      public void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
        for(Text value: values){}
          System.out.println(value +" hello2 , world  " +key );
          context.write(key, value);
        }
      }
}

Upvotes: 3

Related Questions