Reputation: 35
I am trying to learn MapReduce and doing this task.
My input is as below(State, Sport, Amount(in USD)):
California Football 69.09
California Swimming 31.5
Illinois Golf 8.31
Illinois Tennis 15.75
Oklahoma Golf 15.44
Oklahoma Tennis 8.33
Texas Golf 16.71
Texas Swimming 71.59
Washington Football 50.32000000000001
And I am expecting my output such that the output should display which sport is popular in the particular state (depending on the highest sales of sport items). For eg:
California Football 69.09
Illinois Tennis 15.75
Oklahoma Golf 15.44
and so on
Below are my Mapper, Reducer and Driver codes:
Mapper code:
package org.assignment.sports;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class Sports_Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
String[] s= value.toString().split(" ");
String Sport_State = s[0];
String other = s[1]+" "+s[2];
context.write(new Text(Sport_State), new Text(other));
}
}
Reducer Code:
package org.assignment.sports;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Sports_Reducer2 extends Reducer<Text, Text, Text, DoubleWritable>{
private static double MAX=0.00;
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
//String[] k= values.toString().split(" ");
for (Text value:values){
String[] k= value.toString().split(" ");
DoubleWritable price = new DoubleWritable(Double.parseDouble(k[1]));
if(price.get()>MAX){
MAX = price.get();
}
else{
continue;
}
String ss = key.toString()+" "+ k[0];
context.write(new Text(ss), new DoubleWritable(MAX));
}
}
}
Driver Code:
package org.assignment.sports;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sports_Driver2 {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "Sports_Driver2");
String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();
job.setJarByClass(Sports_Driver2.class);
job.setMapperClass(Sports_Mapper2.class);
job.setReducerClass(Sports_Reducer2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0: 1);
}
}
I am getting the output as below:
California Football 69.09
Texas Swimming 71.59
Where am I going wrong? Any help is appreciated
Upvotes: 0
Views: 190
Reputation: 6139
To take max of each values in Reducer for key you need to keep track of the sport's name aswell. Else it will produce wrong results. Please try below code.
Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sports_Driver2 {
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "Sports_Driver2");
String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();
job.setJarByClass(Sports_Driver2.class);
job.setMapperClass(Sports_Mapper2.class);
job.setReducerClass(Sports_Reducer2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
if(fs.exists(new Path(otherArgs[1]))){
fs.delete(new Path(otherArgs[1]), true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0: 1);
}
}
Mapper
public class Sports_Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
String[] s= value.toString().split(" ");
String Sport_State = s[0];
String other = s[1]+" "+s[2];
context.write(new Text(Sport_State), new Text(other));
}
}
Reducer
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class Sports_Reducer2 extends Reducer<Text, Text, Text, DoubleWritable>{
Text keyEmit = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
Map<String,Double> getMax = new HashMap<>();
String sportName = "";
for (Text value:values){
String[] k= value.toString().split(" ");
sportName = k[0];
//store values
getMax.put(sportName, Double.parseDouble(k[1]));
}
/*
* Get maximum
*/
Map.Entry<String, Double> maxEntry = null;
for (Entry<String, Double> entry : getMax.entrySet())
{
if (maxEntry == null || entry.getValue().compareTo(maxEntry.getValue()) > 0)
{
maxEntry = entry;
}
}
keyEmit.set(key.toString()+" "+maxEntry.getKey());
context.write(keyEmit, new DoubleWritable(maxEntry.getValue()));
}
}
Output
California Football 69.09
Illinois Tennis 15.75
Oklahoma Golf 15.44
Texas Swimming 71.59
Washington Football 50.32000000000001
Hope this helps.
Upvotes: 0
Reputation: 331
The problem is that the MAX value in the Reducer is not being reset after each particular state is written.
String ss = key.toString()+" "+ k[0];
context.write(new Text(ss), new DoubleWritable(MAX));
MAX = 0.00;
Upvotes: 2