Sashimi
Sashimi

Reputation: 21

MapReduce Program producing empty output

I created a mapreduce program to take World Indicator Data to show the results of the specific indicator I wanted to analyze. (I.E. CO2 Emissions). The data is laid out in a long line that includes, the country, code, indictator, year 1 emission, year 2 emission, etc. In my mapper I tried to only keep the data that I wanted (First only keep the line if it has the specific indicator), then keep the country and all the emissions levels (in a string array).

My entire program runs, but I noticed that it is receiving Map input records, but there are no Map output records or Reduce Input/Output records.

I keep trying to figure out where my logic went wrong, but I'm stumped. Any input is appreciated.

My code is below:

---Mapper--

package org.myorg;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CO2Mapper extends Mapper <LongWritable, Text, Text, IntWritable>
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
    {
        String delims = ",";
        String splitString = value.toString();

        String[] tokens = splitString.split(delims);

        int tokenCount = tokens.length;
        String country = tokens[1]; 
        String indicator = tokens[3];
        int levels;

        if(indicator.equals("EN.ATM.CO2E.KT"))
        {   
            for (int j = 4; j < tokenCount; j++)
            {
                levels = Integer.parseInt(tokens[j]);
                context.write(new Text(country), new IntWritable(levels));
            }
        }
    } 
}

----reducer---

package org.myorg;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class CO2Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        int maxValue = Integer.MIN_VALUE;
        int minValue = Integer.MAX_VALUE;
        for(IntWritable val : values)
        {
            maxValue = Math.max(maxValue, val.get());
            minValue = Math.min(minValue, val.get());
        }

        context.write(key, new IntWritable(maxValue));
        context.write(key, new IntWritable(minValue));
    }
}

---main---

package org.myorg;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

public class CO2Levels 
{

    public static void main(String[] args) throws Exception  
    {    
        //with mapreduce

        Configuration conf = new Configuration();
        Job job = new Job(conf, "co2Levels");

        //Job job = new Job();

        job.setJarByClass(CO2Levels.class);
        //job.setJobName("co2Levels");
        job.setMapperClass(CO2Mapper.class);
        job.setReducerClass(CO2Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setInputFormatClass(TextInputFormat.class);
        //job.setInputFormatClass(KeyValueTextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

Upvotes: 2

Views: 1347

Answers (3)

blackSmith
blackSmith

Reputation: 3154

After analyzing the sample input, seems that I've found the cause of the problem. The following codeblock in the Mapper is erroneous wrt the input :

 for (int j = 4; j < tokenCount; j++){
      levels = Integer.parseInt(tokens[j]);

From 5th column all numerical values are in floating point representation(eg: '8.44E+03') although they are indeed integers. Thus Integer.parseInt is throwing NumberFormatException and the job fails. I'm not convinced with "My entire program runs" statement (check the task logs at JobTracker). If you're certain that input will always contain integers, do something like :

  levels = (int) Float.parseFloat(tokens[j]); 

Else change the data type of levels to float/double and use FloatWritable/DoubleWritable for map's output value class with related changes to reducer.

Another problem with the input is presence of empty fields, which will also produce NumberFormatException during parsing. Add some check like :

  if (tokens[j] != null || tokens.trim().isEmpty()){
         continue; // or do the needful. eg - set levels to 0 or some default value 
  }

Hope this will solve the issue. However I couldn't understand the logic you used in the reducer. This may be intentional but seems like your variables maxValue & minValue will always end with Integer.MAX_VALUE & Integer.MIN_VALUE due to the comparisons :

 maxValue = Math.max(maxValue, val.get());
 minValue = Math.min(minValue, val.get());

which means the above statements are usless or I'm missing the point. Anyway good luck.

Upvotes: 0

GKV
GKV

Reputation: 892

From the sample input i found that the token is in this format 6.16E+03 which is throwing an exception and cannot be parsed as an integer.

Also, if you want to check where your system.out.println() goes, check this

Upvotes: 1

Radek Tomšej
Radek Tomšej

Reputation: 490

In your main you are not importing your map and reduce class. Add following to main:

import org.myorg.CO2Mapper;
import org.myorg.CO2Reducer;

Upvotes: 0

Related Questions