user3343543
user3343543

Reputation: 141

MapReduce job doesn't run on complete data

I have dataset(input.csv) containing 35 columns(0-34 positions). If I run my MRv2 program on this then I am getting "ArrayIndexOutOfBoundException".

But, if I run the program on snapshot of dataset containing the same columns, then it runs successfully.

Error

15/07/20 11:05:55 INFO mapreduce.Job: Task Id : attempt_1437379028043_0018_m_000000_2, Status : FAILED
Error: java.lang.ArrayIndexOutOfBoundsException: 34
    at lotus.staging.StageMapper.map(StageMapper.java:88)
    at lotus.staging.StageMapper.map(StageMapper.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

StageMapper

package lotus.staging;

import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class StageMapper extends Mapper<LongWritable, Text, Text, Text> {

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String[] record = value.toString().split(",");

    // Key
    String stg_table = null;

    String report_code = record[0].trim();
    String product_type_description = null;
    String principal_amount = record[1];
    String funded = record[2].trim();
    String facility_id = record[3];
    String loan_id = record[4];

    // Start Date
    String start_date = record[5];

    // Maturity Date
    String end_date = record[6];

    DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
    Date startDate;
    Date endDate;
    long diff;
    long diffDays = 0l;

    try {
        startDate = df.parse(start_date);
        endDate = df.parse(end_date);
        df.format(startDate);
        df.format(endDate);
        diff = endDate.getTime() - startDate.getTime();
        diffDays = diff / (24 * 60 * 60 * 1000);
    } catch (ParseException e) {
        e.printStackTrace();
    }

    // Date Diff
    String date_diff = String.valueOf(diffDays);

    String next_reset_date = record[7];
    String interest_rate = record[8];
    String base_interest_rate = record[9];
    String counterparty_industry_id = record[10];
    String industry_name = record[11];
    String counterparty_id = record[12];
    String counterparty_name = record[13];

    // Bank Number
    String vehicle_code = record[14];

    String vehicle_description = record[15];

    // Branch Number
    String cost_center_code = record[16];

    String branch_borrower_name = record[17];
    String igl_code = record[20];

    // Participation Bal Begin Month
    String participated_amt = record[21];

    String sys_id = record[23];

    // Loan To Value
    String ltv = record[26];

    String accrual_status = record[27];
    String country_code = record[30];
    String fiscal_year = record[31];
    String accounting_period = record[32];
    String accounting_day = record[33];
    String control_category = record[34];

    // CONTROL_CATEGORY_DESC, Secred_BY_Re

    if (report_code.equalsIgnoreCase("1")) {
        product_type_description = "Commercial_Loan";
        stg_table = "stg_lon";
    } else if (report_code.equalsIgnoreCase("2")) {
        product_type_description = "Mortgage_Loan";
        stg_table = "stg_mgt";
    } else if (report_code.equalsIgnoreCase("3")) {
        product_type_description = "Installment_Loan";
        stg_table = "stg_lon";
    } else if (report_code.equalsIgnoreCase("4")) {
        product_type_description = "Revolving Credit";
        stg_table = "stg_lon";
    }

    // Value
    String data = report_code + "," + product_type_description + ","
            + principal_amount + "," + funded + "," + facility_id + ","
            + loan_id + "," + start_date + "," + end_date + "," + date_diff
            + "," + next_reset_date + "," + interest_rate + ","
            + base_interest_rate + "," + counterparty_industry_id + ","
            + industry_name + "," + counterparty_id + ","
            + counterparty_name + "," + vehicle_code + ","
            + vehicle_description + "," + cost_center_code + ","
            + branch_borrower_name + "," + igl_code + ","
            + participated_amt + "," + sys_id + "," + ltv + ","
            + accrual_status + "," + country_code + "," + fiscal_year + ","
            + accounting_period + "," + accounting_day + ","
            + control_category;

    context.write(new Text(stg_table), new Text(data));

} // map() ends
} // Mapper ends

StageReducer

package lotus.staging;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class StageReducer extends Reducer<Text, Text, Text, Text> {

    private MultipleOutputs mos;

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        mos = new MultipleOutputs(context);
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            mos.write(key, value, key.toString());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        mos.close();
    }
}

StageDriver

package lotus.staging;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class StageDriver {
    // Main
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "StageDriver");

        // conf.set("mapreduce.textoutputformat.separator", ",");
        // conf.set("mapreduce.output.textoutputformat.separator", ",");
        //conf.set("mapreduce.output.key.field.separator", ",");

        job.setJarByClass(StageDriver.class);
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);


        // Mapper and Mapper-Output Key
        job.setMapperClass(StageMapper.class);
        job.setMapOutputKeyClass(Text.class);
        conf.set("mapred.max.split.size", "1020");


        // Reducer and Output Key and Value
        job.setReducerClass(StageReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // Input parameters to execute
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // deleting the output path automatically from hdfs so that we don't
        // have delete it explicitly

        // outputPath.getFileSystem(conf).delete(outputPath);

        // exiting the job only if the flag value becomes false

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

Below are the datasets

Snapshot-Dataset Complete-Dataset

Please assist

Upvotes: 1

Views: 140

Answers (1)

patapouf_ai
patapouf_ai

Reputation: 18693

One of the rows in the input.csv is incomplete or has a formatting error (improperly escaped). Try to figure out which row it is. You can catch an exception where this error occurs and print out the row number and fix your data.

try {
CODE WHERE THE OUTOFBOUNDS HAPPENS
}
catch (Exception e) {

    LOG.warn(String.format("Invalid data in row: %d", row));
    System.out.println(String.format("Invalid data in row: %d", row));


}

So in your case that means:

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String[] record = value.toString().split(",");

    // Key
    String stg_table = null;
try{
    String report_code = record[0].trim();
    String product_type_description = null;
    String principal_amount = record[1];
    String funded = record[2].trim();
    String facility_id = record[3];
    String loan_id = record[4];

    // Start Date
    String start_date = record[5];

    // Maturity Date
    String end_date = record[6];



DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;

try {
    startDate = df.parse(start_date);
    endDate = df.parse(end_date);
    df.format(startDate);
    df.format(endDate);
    diff = endDate.getTime() - startDate.getTime();
    diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
    e.printStackTrace();
}

// Date Diff
String date_diff = String.valueOf(diffDays);

String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];

// Bank Number
String vehicle_code = record[14];

String vehicle_description = record[15];

// Branch Number
String cost_center_code = record[16];

String branch_borrower_name = record[17];
String igl_code = record[20];

// Participation Bal Begin Month
String participated_amt = record[21];

String sys_id = record[23];

// Loan To Value
String ltv = record[26];

String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];

}
    catch (Exception e) {
if {record.size() > 0} {
    // LOG.warn(String.format("Invalid data in row: %s", record[0].trim()));
    System.out.println(String.format("Invalid data in record id: %s", record[0].trim()));}
else{
System.out.println("Empty Record Found");
}
    return void;
}
...

I'm using the record id, because you don't have a row number, but then you can search your that for that record id. And presumably there is at least a first entry in your record. Otherwise you can also check to see if the record is empty.

Upvotes: 1

Related Questions