Reputation: 25
I am having trouble with a MapReduce Job. My map
function does run and it produces the desired output. However, the reduce
function does not run. It seems like the function never gets called. I am using Text as keys and Text as values. But I don't think that this causes the problem.
The input file is formatted as follows:
I want to extract the second date of a line as Text
and use it as key for the reduce. The value for the key will be a combination of the last two float
values in the same line.
i.e.: 2015-06-06 7.71 35.72
2015-06-06 9.71 66.72
So that the value part can be viewed as two columns separated by a blank.
That actually works and I get an output file with many same keys but different values.
Now I want to sum up the both of the float columns for each key, so that after the reduce I get a date as key with the summed up columns as value.
Problem: reduce does not run.
See the code below:
public class Aggregate {
public static class EarnDistMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] splitResult = value.toString().split(",");
String dropOffDate = "";
String compEarningDist = "";
//dropoffDate at pos 1 as key
dropOffDate = splitResult[1];
//distance at pos length-2 and earnings at pos length-1 as values separated by space
compEarningDist = splitResult[splitResult.length -2] + " " + splitResult[splitResult.length-1];
context.write(new Text(dropOffDate), new Text(compEarningDist));
public static class EarnDistReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterator<Text> values, Context context) throws IOException, InterruptedException {
float sumDistance = 0;
float sumEarnings = 0;
String[] splitArray;
while (values.hasNext()){
splitArray ="\\s+");
//distance first
sumDistance += Float.parseFloat(splitArray[0]);
sumEarnings += Float.parseFloat(splitArray[1]);
//combine result to text
context.write(key, new Text(Float.toString(sumDistance) + " " + Float.toString(sumEarnings)));
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Taxi dropoff");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
Thank you for your help!!
Upvotes: 1
Views: 521
Reputation: 13937
You have the signature of the reduce
method wrong. You have:
public void reduce(Text key, Iterator<Text> values, Context context) {
It should be:
public void reduce(Text key, Iterable<Text> values, Context context) {
Upvotes: 3