Suanmeiguo
Suanmeiguo

Reputation: 1405

hadoop reducer output was read in reducer iteratively

I am just testing on the word count example using a 3 machine cluster. My codes are the same as this example except the following:

I add two line code in the reducer code before "output.collect(key, new IntWritable(sum))" line:

System.out.println(key);
key.set(key + " - Key in Reducer");

Then I check my reducer log (last 8 K, I found this:

3M3WI - Key in Reducer - Key in Reducer
3M3WIG - Key in Reducer - Key in Reducer
3M3WL - Key in Reducer - Key in Reducer
3M3WNWPLG - Key in Reducer - Key in Reducer
3M3WQ - Key in Reducer - Key in Reducer
3M3WQNG.K78QJ0WN, - Key in Reducer - Key in Reducer
3M3WWR - Key in Reducer - Key in Reducer
3M3WX - Key in Reducer - Key in Reducer
3M3X - Key in Reducer - Key in Reducer
3M3X,. - Key in Reducer - Key in Reducer
3M3X.KZA8J - Key in Reducer - Key in Reducer
3M3X1 - Key in Reducer - Key in Reducer
3M3X8RC - Key in Reducer - Key in Reducer
3M3XC - Key in Reducer - Key in Reducer
3M3XCBD9R337PK - Key in Reducer - Key in Reducer
3M3XD - Key in Reducer - Key in Reducer
3M3XLW - Key in Reducer - Key in Reducer
3M3XML - Key in Reducer - Key in Reducer
3M3XN - Key in Reducer - Key in Reducer
3M3XU - Key in Reducer - Key in Reducer
3M3XX - Key in Reducer - Key in Reducer
3M3XZ - Key in Reducer - Key in Reducer
3M3Y - Key in Reducer - Key in Reducer
3M3YAIJL - Key in Reducer - Key in Reducer

Which means that my reducer output was input again in reducer. This should be the way hadoop works right? It shouldn't be iterative... And my code are the same as the example in hadoop.apache.com website...

Does anyone encounter the same issue?

Attached all my code, mostly the same as the example.

package test;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
      }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        System.out.println(key);
        key.set(key+" - Key in Reducer");
        output.collect(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }
}

Upvotes: 0

Views: 464

Answers (1)

Tariq
Tariq

Reputation: 34184

Comment out conf.setCombinerClass(Reduce.class); and it should be OK. This is happening because you are using your Reducer as your Combiner as well.

When a combiner is available, the output of the map() is fed to the combine() first. The output of the combine() function is then sent over to the reduce() function on a reducer machine. So the actual input to your reduce() already has 1 Key in Reducer included in it, which gets doubled after passing through the reduce(). That's why you are getting Key in Reducer twice.

Upvotes: 2

Related Questions