Rahuul
Rahuul

Reputation: 31

MapReduce - Reducer giving wrong output date

I have written a MapReduce program to parse the values from CSV.

The data set is as follows -

PRAVEEN,40020,Baby,026A2,12/04/2015

PRAVEEN,40020,TOY,0383,1/04/2014

PRAVEEN,2727272,BOOK,03383,03/14/2013

PRAVEEN,22636,BIKE,7373737,12/24/2012

My Map function is reading the first value(ie UserName) from CSV as KEY and last value ie Date as VALUE

My Reduce function is also very simple , I have to select the latest date as the VALUE from the list of VALUES for a particular KEY ie UserName

The code is as Follows -

  package com.test.mapreduce;
  import java.io.IOException;
  import java.text.ParseException;
  import java.text.SimpleDateFormat;
  import java.util.ArrayList;
  import java.util.Date;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.List;
  import java.util.Set;

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.conf.Configured;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapred.FileInputFormat;
  import org.apache.hadoop.mapred.FileOutputFormat;
  import org.apache.hadoop.mapred.JobClient;
  import org.apache.hadoop.mapred.JobConf;
  import org.apache.hadoop.mapred.KeyValueTextInputFormat;
  import org.apache.hadoop.mapred.MapReduceBase;
  import org.apache.hadoop.mapred.Mapper;
  import org.apache.hadoop.mapred.OutputCollector;
  import org.apache.hadoop.mapred.Reducer;
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.mapred.TextInputFormat;
  import org.apache.hadoop.mapred.TextOutputFormat;
  import org.apache.hadoop.util.Tool;
  import org.apache.hadoop.util.ToolRunner;





 public class RetailCustomerAnalysis_2 extends Configured implements Tool {
             public static class MapClass extends MapReduceBase
             implements Mapper<LongWritable, Text, Text, Text> {

      private Text key1 = new Text();
      private Text value1 = new Text();
      private int noofFields = 5;



 public void map(LongWritable key, Text value,
                 OutputCollector<Text, Text> output,
                 Reporter reporter) throws IOException {

        String line = value.toString().replaceAll("\\s+","");
        String[] split = line.split(",");


        if(split.length!=noofFields){
        return;
        }

        else {
            key1.set(split[0].toString().trim()); 
            value1.set(split[4].toString().trim());
            System.out.println(split[4].toString().trim());
            output.collect(key1, value1);
     }
    }
  }

 public static class Reduce extends MapReduceBase
 implements Reducer<Text, Text, Text, Text> {

 public void reduce(Text key, Iterator<Text> values,
                    OutputCollector<Text, Text> output,
                    Reporter reporter) throws IOException {

     SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
     Date date = new Date();

     List<Text> dateList = new ArrayList<Text>();

     for(Iterator<Text> it = values; it.hasNext();) {
         // add the values in the arrayList
         dateList.add((Text) it.next());
     }


     if(dateList.size()==1){ //If the mapper output has only one date , then select that date 
                             // as the VALUE
     try  {
            date = formatter.parse(dateList.get(0).toString());
          } catch (ParseException e) {
            e.printStackTrace();
        }
     } //If part ends 

     else {
             try {
               date = formatter.parse(dateList.get(0).toString()); 
                      //select the first date from list
             } catch (ParseException e1) {
               e1.printStackTrace();
             }

             for(int i=0 ; i <dateList.size();++i){
                   try {
                   //compare the selected date with the rest of the dates in the list.
                   if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){
                       date=formatter.parse(dateList.get(i).toString());
                       // getting the max date from the list
                        }
                   }
                   catch (ParseException e) {
                  e.printStackTrace();
                }
             } //for loops ends
     }  // else part ends    

     Text value = new Text(date.toString());
       output.collect(key, value);
      }
  }



 public int run(String[] args) throws Exception {
 Configuration conf = getConf();

 JobConf job = new JobConf(conf, RetailCustomerAnalysis_2.class);

 Path in = new Path(args[0]);
 Path out = new Path(args[1]);
 FileInputFormat.setInputPaths(job, in);
 FileOutputFormat.setOutputPath(job, out);

 job.setJobName("RetailCustomerAnalysis_2");
 job.setMapperClass(MapClass.class);
 job.setReducerClass(Reduce.class);

 job.setInputFormat(TextInputFormat.class);
 job.setOutputFormat(TextOutputFormat.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(Text.class);
 job.set("key.value.separator.in.input.line", ",");

 JobClient.runJob(job);
 return 0;
  }

public static void main(String[] args) throws Exception { 
 int res = ToolRunner.run(new Configuration(), new RetailCustomerAnalysis_2(), args);

 System.exit(res);
 }

 }

But I'm getting random date from the list as result. Can Anyone help.

Upvotes: 0

Views: 718

Answers (1)

pranabh
pranabh

Reputation: 26

Code is mostly correct. The reducer implementation has to be modified slightly. The below code snipper creating the problem

for(Iterator<Text> it = values; it.hasNext();) {
   // add the values in the arrayList
   dateList.add((Text) it.next());
}

In the above code snippet same value object is used in each itreation, only their contents are being changed.

For example, Assume Mapreduce run with following input

PRAVEEN,4002013454,Baby,026A12,12/04/2015

PRAVEEN,4002013454,TOY,020383,1/04/2014

PRAVEEN,2727272727272,BOOK,03383,03/14/2013

PRAVEEN,2263637373,BIKE,7373737,12/24/2012

In the reduce method 'dateList' object elemets has values (12/24/2012, 12/24/2012, 12/24/2012, 12/24/2012) after the for loop completion on the values. This leads to incorrect execution of the remaining code and final output is wrong.

Instead you change code as below

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

        SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
        Date date = new Date();
        //-----Modified section START-----------
        List<String> dateList = new ArrayList<String>();

        for(Iterator<Text> it = values; it.hasNext();) {
            // add the values in the arrayList
            dateList.add(((Text)it.next()).toString());
        }
        //----Modified section END--------------
        if(dateList.size()==1){ //If the mapper output has only one date , then select that date 
            // as the VALUE
            try  {
                date = formatter.parse(dateList.get(0).toString());
            } catch (ParseException e) {
                e.printStackTrace();
            }
        } //If part ends 
        else {
            String str = dateList.get(0).toString();
            try {

                date = formatter.parse(dateList.get(0).toString());
                //select the first date from list
            } catch (ParseException e1) {
                e1.printStackTrace();
            }

            for(int i=0 ; i <dateList.size();++i){
                try {
                    //compare the selected date with the rest of the dates in the list.
                    if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){
                        date=formatter.parse(dateList.get(i).toString());
                        // getting the max date from the list
                    }
                }
                catch (ParseException e) {
                    e.printStackTrace();
                }
            } //for loops ends
        }  // else part ends    

        Text value = new Text(date.toString());
        output.collect(key, value);
    }
}

Please refer Hadoop Reducer Values in Memory? for more details about object reference in map,reduce methods.

Upvotes: 0

Related Questions