Reputation: 31
I have written a MapReduce program to parse the values from CSV.
The data set is as follows -
PRAVEEN,40020,Baby,026A2,12/04/2015
PRAVEEN,40020,TOY,0383,1/04/2014
PRAVEEN,2727272,BOOK,03383,03/14/2013
PRAVEEN,22636,BIKE,7373737,12/24/2012
My Map function is reading the first value(ie UserName) from CSV as KEY and last value ie Date as VALUE
My Reduce function is also very simple , I have to select the latest date as the VALUE from the list of VALUES for a particular KEY ie UserName
The code is as Follows -
package com.test.mapreduce;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class RetailCustomerAnalysis_2 extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
private Text key1 = new Text();
private Text value1 = new Text();
private int noofFields = 5;
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String line = value.toString().replaceAll("\\s+","");
String[] split = line.split(",");
if(split.length!=noofFields){
return;
}
else {
key1.set(split[0].toString().trim());
value1.set(split[4].toString().trim());
System.out.println(split[4].toString().trim());
output.collect(key1, value1);
}
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
Date date = new Date();
List<Text> dateList = new ArrayList<Text>();
for(Iterator<Text> it = values; it.hasNext();) {
// add the values in the arrayList
dateList.add((Text) it.next());
}
if(dateList.size()==1){ //If the mapper output has only one date , then select that date
// as the VALUE
try {
date = formatter.parse(dateList.get(0).toString());
} catch (ParseException e) {
e.printStackTrace();
}
} //If part ends
else {
try {
date = formatter.parse(dateList.get(0).toString());
//select the first date from list
} catch (ParseException e1) {
e1.printStackTrace();
}
for(int i=0 ; i <dateList.size();++i){
try {
//compare the selected date with the rest of the dates in the list.
if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){
date=formatter.parse(dateList.get(i).toString());
// getting the max date from the list
}
}
catch (ParseException e) {
e.printStackTrace();
}
} //for loops ends
} // else part ends
Text value = new Text(date.toString());
output.collect(key, value);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, RetailCustomerAnalysis_2.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("RetailCustomerAnalysis_2");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RetailCustomerAnalysis_2(), args);
System.exit(res);
}
}
But I'm getting random date from the list as result. Can Anyone help.
Upvotes: 0
Views: 718
Reputation: 26
Code is mostly correct. The reducer implementation has to be modified slightly. The below code snipper creating the problem
for(Iterator<Text> it = values; it.hasNext();) {
// add the values in the arrayList
dateList.add((Text) it.next());
}
In the above code snippet same value object is used in each itreation, only their contents are being changed.
For example, Assume Mapreduce run with following input
PRAVEEN,4002013454,Baby,026A12,12/04/2015
PRAVEEN,4002013454,TOY,020383,1/04/2014
PRAVEEN,2727272727272,BOOK,03383,03/14/2013
PRAVEEN,2263637373,BIKE,7373737,12/24/2012
In the reduce method 'dateList' object elemets has values (12/24/2012, 12/24/2012, 12/24/2012, 12/24/2012) after the for loop completion on the values. This leads to incorrect execution of the remaining code and final output is wrong.
Instead you change code as below
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy");
Date date = new Date();
//-----Modified section START-----------
List<String> dateList = new ArrayList<String>();
for(Iterator<Text> it = values; it.hasNext();) {
// add the values in the arrayList
dateList.add(((Text)it.next()).toString());
}
//----Modified section END--------------
if(dateList.size()==1){ //If the mapper output has only one date , then select that date
// as the VALUE
try {
date = formatter.parse(dateList.get(0).toString());
} catch (ParseException e) {
e.printStackTrace();
}
} //If part ends
else {
String str = dateList.get(0).toString();
try {
date = formatter.parse(dateList.get(0).toString());
//select the first date from list
} catch (ParseException e1) {
e1.printStackTrace();
}
for(int i=0 ; i <dateList.size();++i){
try {
//compare the selected date with the rest of the dates in the list.
if((formatter.parse(dateList.get(i).toString())).compareTo(date)>0){
date=formatter.parse(dateList.get(i).toString());
// getting the max date from the list
}
}
catch (ParseException e) {
e.printStackTrace();
}
} //for loops ends
} // else part ends
Text value = new Text(date.toString());
output.collect(key, value);
}
}
Please refer Hadoop Reducer Values in Memory? for more details about object reference in map,reduce methods.
Upvotes: 0