Pat Mulvihill
Pat Mulvihill

Reputation: 53

Hadoop map-reducer is not writing any output

I am working on a three node Hadoop mapreduce problem that is intended to take a 200,000 line input.csv file with Dates and Point Values as headers (gist of 25 line sample data: https://gist.githubusercontent.com/PatMulvihill/63effd90411efe858330b54a4111fadb/raw/4033695ba5ca2f439cfd1512358425643807d83b/input.csv). The program should find any Point Value that isn't the following values: 200, 400, 600, 800, 1000, 1200, 1600, or 2000. That point value should be the value. The key should be the year from the date in the value before that point value. For instance, if we have the data 2000-05-25,400 2001-10-12, 650 2001-04-09, 700 The key-value pairs that should be sent to the reducer are <2001, 650> and <2001, 700>. The reducer should then take the average of all values in each given year, and write those key-value pairs to the hdfs /out path I have specified. The program compiles fine, but never actually writes anything to output. I want to know why and what I can do to fix it. The full code is as follows:

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JeopardyMR {

public static class SplitterMapper extends Mapper <Object, Text, Text, IntWritable> {

    public void map (Object key, Text value, Context context) throws IOException, InterruptedException {
        // Convert the CSVString (of type Text) to a string
        String CSVString = value.toString();
        // Split the string at each comma, creating an ArrayList with the different attributes in each index.
        // Sometimes the questions will be split into multiple elements because they contain commas, but for the
        // way we will be parsing the CSV's, it doesn't matter.
        List<String> items = Arrays.asList(CSVString.split("\\s*,\\s*"));
        // Loop through all the elements in the CSV
        // Start i at 3 to ensure that you do not parse a point value that has a year absent from the data set.
        // We can end the loop at items.size() w/o truncating the last 3 items because if we have a point value, we know
        // that the corresponding year is in the items before it, not after it.
        // We will miss 1 or 2 data points because of this, but it shouldn't matter too much because of the magnitude of our data set
        // and the fact that a value has a low probability of actually being a daily double wager.
        for (int i = 3; i < items.size(); i++) {
            // We want a String version of the item that is being evaluated so that we can see if it matches the regex
            String item = items.get(i);
            if (item.matches("^\\d{4}\\-(0?[1-9]|1[012])\\-(0?[1-9]|[12][0-9]|3[01])$")) {
                // Make sure that we don't get an out of bounds error when trying to access the next item
                if (i + 1 >= items.size()) {
                    break;
                } else {
                    // the wagerStr should always be the item after a valid air date
                    String wagerStr = items.get(i + 1);
                    int wager = Integer.parseInt(wagerStr);
                    // if a wager isn't the following values, assume that is a daily double wager
                    if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) {
                        // if we know that a point value of a question is in fact a daily double wager, find the year that the daily double happened
                        // the year will always be the first 4 digits of a valid date formatted YYYY-MM-DD
                        char[] airDateChars = item.toCharArray();
                        String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3];

                        // output the follow key-value pair: <year, wager>
                        context.write(new Text(year), new IntWritable(wager));
                    }
                }

            }
        }
    }
}

public static class IntSumReducer extends Reducer <Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();
    public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0, count = 0;
        for (IntWritable val : values) {
            sum += val.get();
            count++;
        }
        int avg = sum / count;
        result.set(avg);
        context.write(key, result);
    }
}

public static void main (String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "jeopardy daily double wagers by year");
    job.setJarByClass(JeopardyMR.class);
    job.setMapperClass(SplitterMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

The successful compile terminal output can be found here: https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187 I am very new to Hadoop map-reduce, and I am probably making a very silly mistake. I based this code off the code found here: https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html Please let me know if I am missing any useful information. Any help would be appreciated! Thank you.

Upvotes: 0

Views: 1845

Answers (2)

Pat Mulvihill
Pat Mulvihill

Reputation: 53

I actually fixed this issue by converting my .csv file to a .txt file. That isn't a real solution to the problem, but it is what made my code work and now I can move on to understanding why it was an issue. Plus, this might help someone in the future!

Upvotes: 0

vahid
vahid

Reputation: 456

I check and think items.size() is two. as you know input of map is lines of files and map task executed map function for each line. once each line splited by semicolon the size of items become 2 and next for executed when items size bigger than 3. You can check map output write byte to see any data write or not. EDIT : replace map code with this :

public void map (Object key, Text value, Context context) throws IOException, InterruptedException {
        String CSVString = value.toString();
        String[] yearsValue =  CSVString.split("\\s*,\\s*");
        if(yearsValue.length == 2){
            int wager = Integer.parseInt(yearsValue[1]);
            if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) {
                char[] airDateChars = yearsValue[0].toCharArray();
                String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3];
                context.write(new Text(year), new IntWritable(wager));

            }
        }else{
            System.out.println(CSVString);
        }
}

Upvotes: 1

Related Questions