Prabhat Kumar
Prabhat Kumar

Reputation: 35

Unable to get the expected reduced output using Mapreduce in Hadoop

I am trying to learn MapReduce and doing this task.

My input is as below(State, Sport, Amount(in USD)):

California Football 69.09 California Swimming 31.5 Illinois Golf 8.31 Illinois Tennis 15.75 Oklahoma Golf 15.44 Oklahoma Tennis 8.33 Texas Golf 16.71 Texas Swimming 71.59 Washington Football 50.32000000000001

And I am expecting my output such that the output should display which sport is popular in the particular state (depending on the highest sales of sport items). For eg:

California Football 69.09 Illinois Tennis 15.75 Oklahoma Golf 15.44 and so on

Below are my Mapper, Reducer and Driver codes:

Mapper code:

package org.assignment.sports;

import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Sports_Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
        String[] s= value.toString().split(" ");
        String Sport_State = s[0];
        String other = s[1]+" "+s[2];
        context.write(new Text(Sport_State), new Text(other));
    }
}

Reducer Code:

package org.assignment.sports;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class Sports_Reducer2 extends Reducer<Text, Text, Text, DoubleWritable>{

    private static double MAX=0.00;
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        //String[] k= values.toString().split(" ");
        for (Text value:values){
            String[] k= value.toString().split(" ");
            DoubleWritable price = new DoubleWritable(Double.parseDouble(k[1]));
        if(price.get()>MAX){
            MAX = price.get();
        }
        else{
            continue;
        }
        String ss = key.toString()+" "+ k[0];
        context.write(new Text(ss), new DoubleWritable(MAX));
        }               
        }

}

Driver Code:

package org.assignment.sports;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sports_Driver2 {
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "Sports_Driver2");

        String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();

        job.setJarByClass(Sports_Driver2.class);
        job.setMapperClass(Sports_Mapper2.class);
        job.setReducerClass(Sports_Reducer2.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0: 1);
    }

}

I am getting the output as below:

California Football 69.09 Texas Swimming 71.59

Where am I going wrong? Any help is appreciated

Upvotes: 0

Views: 190

Answers (2)

USB
USB

Reputation: 6139

To take max of each values in Reducer for key you need to keep track of the sport's name aswell. Else it will produce wrong results. Please try below code.

Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sports_Driver2 {
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Sports_Driver2");

        String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();

        job.setJarByClass(Sports_Driver2.class);
        job.setMapperClass(Sports_Mapper2.class);
        job.setReducerClass(Sports_Reducer2.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        if(fs.exists(new Path(otherArgs[1]))){
            fs.delete(new Path(otherArgs[1]), true);
        }

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0: 1);
    }

}

Mapper

public class Sports_Mapper2 extends Mapper<LongWritable, Text, Text, Text>{
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
        String[] s= value.toString().split(" ");
        String Sport_State = s[0];
        String other = s[1]+" "+s[2];
        context.write(new Text(Sport_State), new Text(other));
    }
}

Reducer

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class Sports_Reducer2 extends Reducer<Text, Text, Text, DoubleWritable>{

    Text keyEmit = new Text();
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {
        Map<String,Double> getMax = new HashMap<>();
        String sportName = "";
        for (Text value:values){
            String[] k= value.toString().split(" ");
            sportName = k[0];
            //store values
            getMax.put(sportName, Double.parseDouble(k[1]));
        }   
        /*
         * Get maximum
         */
        Map.Entry<String, Double> maxEntry = null;
        for (Entry<String, Double> entry : getMax.entrySet())
        {
            if (maxEntry == null || entry.getValue().compareTo(maxEntry.getValue()) > 0)
            {
                maxEntry = entry;
            }
        }
        keyEmit.set(key.toString()+" "+maxEntry.getKey());
        context.write(keyEmit, new DoubleWritable(maxEntry.getValue()));
    }

}

Output

California Football 69.09
Illinois Tennis 15.75
Oklahoma Golf   15.44
Texas Swimming  71.59
Washington Football 50.32000000000001

Hope this helps.

Upvotes: 0

byteherder
byteherder

Reputation: 331

The problem is that the MAX value in the Reducer is not being reset after each particular state is written.

String ss = key.toString()+" "+ k[0];
context.write(new Text(ss), new DoubleWritable(MAX));
MAX = 0.00;

Upvotes: 2

Related Questions