Karan Rekhi
Karan Rekhi

Reputation: 25

duplicates in mapreduce program output?

I get many duplicate values in my output, so I have implemented a reduce function as shown below, but still this reduce works as an identity function, that is there is no difference in output even if I have a reduce or not. What's wrong with my reduce function?

       public class search 
{      
    public static String str="And";
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
    {
        String mname="";
        public void configure(JobConf job)
        {
             mname=job.get(str);
             job.set(mname,str);
        }

        private Text word = new Text();
        public Text Uinput =new Text("");
        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
        {

            String mapstr=mname;
            Uinput.set(mapstr);
            String line = value.toString();
            Text fdata = new Text();

            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {
                word.set(tokenizer.nextToken());
                fdata.set(line);

                if(word.equals(Uinput))
                output.collect(fdata,new Text(""));
            }

        }
    } 

    public static class SReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
    {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
        {

            boolean start = true;
            //System.out.println("inside reduce   :"+input);
            StringBuilder sb = new StringBuilder();
            while (values.hasNext()) 
            {
                if(!start)

                start=false;
                sb.append(values.next().toString());

            }
            //output.collect(key, new IntWritable(sum));
            output.collect(key, new Text(sb.toString()));
        }
    }

public static void main(String[] args) throws Exception {

    JobConf conf = new JobConf(search.class);
    conf.setJobName("QueryIndex");
    //JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.set(str,args[0]);

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(Map.class);
    //conf.setCombinerClass(SReducer.class);
    conf.setReducerClass(SReducer.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);



    FileInputFormat.setInputPaths(conf, new Path("IIndexOut"));
    FileOutputFormat.setOutputPath(conf, new Path("searchOut"));

    JobClient.runJob(conf);
}

}

Upvotes: 1

Views: 2411

Answers (3)

Arockiaraj Durairaj
Arockiaraj Durairaj

Reputation: 56

use @override annotation before map and reduce functions. So that you can be very sure, that you are overriding the base class method.

Upvotes: 0

sulabhc
sulabhc

Reputation: 666

I have not looked at the code thoroughly, but one thing I am certain about is the boolean variable start is useless here, the code below if (!start) should be in brackets to de-dup data, otherwise you just end up writing all the data in reducer which you receive from mapper.

 public static class SReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
{
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
    {

        boolean start = true;
        //System.out.println("inside reduce   :"+input);
        StringBuilder sb = new StringBuilder();
        while (values.hasNext()) 
        {
            if(!start)
            {
               start=false;
               sb.append(values.next().toString());
            }

        }
        //output.collect(key, new IntWritable(sum));
        output.collect(key, new Text(sb.toString()));
    }
}

Or an optimal reduce method would be to just :-

public static class SReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
  {
  public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
{

   //System.out.println("inside reduce   :"+input);
    StringBuilder sb = new StringBuilder();
    sb.append(values.next().toString());

    //output.collect(key, new IntWritable(sum));
    output.collect(key, new Text(sb.toString()));
}

}

As you only care about the 1st value of the iterator.

Upvotes: 1

Chaos
Chaos

Reputation: 11721

Maybe you have not set this reducer as the actual reduce function to be used? That is done using

job.setReducerClass(). 

If you don't set the class as your class, then the default reducer is used. You should do the following:

job.setReducerClass(SReducer.class)

please post your main function so that we can verify that.

Upvotes: 0

Related Questions