armmie
armmie

Reputation: 25

Hadoop Reducer does not work

I am having trouble with a MapReduce Job. My map function does run and it produces the desired output. However, the reduce function does not run. It seems like the function never gets called. I am using Text as keys and Text as values. But I don't think that this causes the problem.

The input file is formatted as follows:

2015-06-06,2015-06-06,40.80239868164062,-73.93379211425781,40.72591781616211,-73.98358154296875,7.71,35.72
2015-06-06,2015-06-06,40.71020126342773,-73.96302032470703,40.72967529296875,-74.00226593017578,3.11,2.19
2015-06-05,2015-06-05,40.68404388427734,-73.97597503662109,40.67932510375977,-73.95581817626953,1.13,1.29
...

I want to extract the second date of a line as Text and use it as key for the reduce. The value for the key will be a combination of the last two float values in the same line.
i.e.: 2015-06-06 7.71 35.72 2015-06-06 9.71 66.72
So that the value part can be viewed as two columns separated by a blank.
That actually works and I get an output file with many same keys but different values.

Now I want to sum up the both of the float columns for each key, so that after the reduce I get a date as key with the summed up columns as value.

Problem: reduce does not run.

See the code below:

Mapper

public class Aggregate {

public static class EarnDistMapper extends Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        String [] splitResult = value.toString().split(",");
        String dropOffDate = "";
        String compEarningDist = "";
        //dropoffDate at pos 1 as key 
        dropOffDate = splitResult[1];
        //distance at pos length-2 and earnings at pos length-1 as values separated by space
        compEarningDist = splitResult[splitResult.length -2] + " " + splitResult[splitResult.length-1];

        context.write(new Text(dropOffDate), new Text(compEarningDist));
    }
}

Reducer

public static class EarnDistReducer extends Reducer<Text,Text,Text,Text> {

    public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {

         float sumDistance = 0;
         float sumEarnings = 0;
         String[] splitArray; 

         while (values.hasNext()){
             splitArray = values.next().toString().split("\\s+");
             //distance first
             sumDistance += Float.parseFloat(splitArray[0]);
             sumEarnings += Float.parseFloat(splitArray[1]);
         }

         //combine result to text

         context.write(key, new Text(Float.toString(sumDistance) + " " + Float.toString(sumEarnings)));
    }
}

Job

public static void main(String[] args) throws Exception{
    // TODO
    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "Taxi dropoff");
    job.setJarByClass(Aggregate.class);
    job.setMapperClass(EarnDistMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setCombinerClass(EarnDistReducer.class);
    job.setReducerClass(EarnDistReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Thank you for your help!!

Upvotes: 1

Views: 519

Answers (1)

Binary Nerd
Binary Nerd

Reputation: 13927

You have the signature of the reduce method wrong. You have:

public void reduce(Text key, Iterator<Text> values, Context context) {

It should be:

public void reduce(Text key, Iterable<Text> values, Context context) {

Upvotes: 3

Related Questions