Reputation: 21
I created a mapreduce program to take World Indicator Data to show the results of the specific indicator I wanted to analyze. (I.E. CO2 Emissions). The data is laid out in a long line that includes, the country, code, indictator, year 1 emission, year 2 emission, etc. In my mapper I tried to only keep the data that I wanted (First only keep the line if it has the specific indicator), then keep the country and all the emissions levels (in a string array).
My entire program runs, but I noticed that it is receiving Map input records, but there are no Map output records or Reduce Input/Output records.
I keep trying to figure out where my logic went wrong, but I'm stumped. Any input is appreciated.
My code is below:
---Mapper--
package org.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CO2Mapper extends Mapper <LongWritable, Text, Text, IntWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String delims = ",";
String splitString = value.toString();
String[] tokens = splitString.split(delims);
int tokenCount = tokens.length;
String country = tokens[1];
String indicator = tokens[3];
int levels;
if(indicator.equals("EN.ATM.CO2E.KT"))
{
for (int j = 4; j < tokenCount; j++)
{
levels = Integer.parseInt(tokens[j]);
context.write(new Text(country), new IntWritable(levels));
}
}
}
}
----reducer---
package org.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CO2Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int maxValue = Integer.MIN_VALUE;
int minValue = Integer.MAX_VALUE;
for(IntWritable val : values)
{
maxValue = Math.max(maxValue, val.get());
minValue = Math.min(minValue, val.get());
}
context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(minValue));
}
}
---main---
package org.myorg;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
//import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
public class CO2Levels
{
public static void main(String[] args) throws Exception
{
//with mapreduce
Configuration conf = new Configuration();
Job job = new Job(conf, "co2Levels");
//Job job = new Job();
job.setJarByClass(CO2Levels.class);
//job.setJobName("co2Levels");
job.setMapperClass(CO2Mapper.class);
job.setReducerClass(CO2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setInputFormatClass(TextInputFormat.class);
//job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Upvotes: 2
Views: 1347
Reputation: 3154
After analyzing the sample input, seems that I've found the cause of the problem. The following codeblock in the Mapper
is erroneous wrt the input :
for (int j = 4; j < tokenCount; j++){
levels = Integer.parseInt(tokens[j]);
From 5th column all numerical values are in floating point representation(eg: '8.44E+03') although they are indeed integers. Thus Integer.parseInt
is throwing NumberFormatException
and the job fails. I'm not convinced with "My entire program runs" statement (check the task logs at JobTracker). If you're certain that input will always contain integers, do something like :
levels = (int) Float.parseFloat(tokens[j]);
Else change the data type of levels
to float/double and use FloatWritable/DoubleWritable for map's output value class with related changes to reducer.
Another problem with the input is presence of empty fields, which will also produce NumberFormatException
during parsing. Add some check like :
if (tokens[j] != null || tokens.trim().isEmpty()){
continue; // or do the needful. eg - set levels to 0 or some default value
}
Hope this will solve the issue. However I couldn't understand the logic you used in the reducer. This may be intentional but seems like your variables maxValue
& minValue
will always end with Integer.MAX_VALUE
& Integer.MIN_VALUE
due to the comparisons :
maxValue = Math.max(maxValue, val.get());
minValue = Math.min(minValue, val.get());
which means the above statements are usless or I'm missing the point. Anyway good luck.
Upvotes: 0
Reputation: 892
From the sample input i found that the token is in this format 6.16E+03 which is throwing an exception and cannot be parsed as an integer.
Also, if you want to check where your system.out.println() goes, check this
Upvotes: 1
Reputation: 490
In your main you are not importing your map and reduce class. Add following to main:
import org.myorg.CO2Mapper;
import org.myorg.CO2Reducer;
Upvotes: 0