Reputation: 141
I have dataset(input.csv) containing 35 columns(0-34 positions). If I run my MRv2 program on this then I am getting "ArrayIndexOutOfBoundException".
But, if I run the program on snapshot of dataset containing the same columns, then it runs successfully.
Error
15/07/20 11:05:55 INFO mapreduce.Job: Task Id : attempt_1437379028043_0018_m_000000_2, Status : FAILED
Error: java.lang.ArrayIndexOutOfBoundsException: 34
at lotus.staging.StageMapper.map(StageMapper.java:88)
at lotus.staging.StageMapper.map(StageMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
StageMapper
package lotus.staging;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] record = value.toString().split(",");
// Key
String stg_table = null;
String report_code = record[0].trim();
String product_type_description = null;
String principal_amount = record[1];
String funded = record[2].trim();
String facility_id = record[3];
String loan_id = record[4];
// Start Date
String start_date = record[5];
// Maturity Date
String end_date = record[6];
DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;
try {
startDate = df.parse(start_date);
endDate = df.parse(end_date);
df.format(startDate);
df.format(endDate);
diff = endDate.getTime() - startDate.getTime();
diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
e.printStackTrace();
}
// Date Diff
String date_diff = String.valueOf(diffDays);
String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];
// Bank Number
String vehicle_code = record[14];
String vehicle_description = record[15];
// Branch Number
String cost_center_code = record[16];
String branch_borrower_name = record[17];
String igl_code = record[20];
// Participation Bal Begin Month
String participated_amt = record[21];
String sys_id = record[23];
// Loan To Value
String ltv = record[26];
String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];
// CONTROL_CATEGORY_DESC, Secred_BY_Re
if (report_code.equalsIgnoreCase("1")) {
product_type_description = "Commercial_Loan";
stg_table = "stg_lon";
} else if (report_code.equalsIgnoreCase("2")) {
product_type_description = "Mortgage_Loan";
stg_table = "stg_mgt";
} else if (report_code.equalsIgnoreCase("3")) {
product_type_description = "Installment_Loan";
stg_table = "stg_lon";
} else if (report_code.equalsIgnoreCase("4")) {
product_type_description = "Revolving Credit";
stg_table = "stg_lon";
}
// Value
String data = report_code + "," + product_type_description + ","
+ principal_amount + "," + funded + "," + facility_id + ","
+ loan_id + "," + start_date + "," + end_date + "," + date_diff
+ "," + next_reset_date + "," + interest_rate + ","
+ base_interest_rate + "," + counterparty_industry_id + ","
+ industry_name + "," + counterparty_id + ","
+ counterparty_name + "," + vehicle_code + ","
+ vehicle_description + "," + cost_center_code + ","
+ branch_borrower_name + "," + igl_code + ","
+ participated_amt + "," + sys_id + "," + ltv + ","
+ accrual_status + "," + country_code + "," + fiscal_year + ","
+ accounting_period + "," + accounting_day + ","
+ control_category;
context.write(new Text(stg_table), new Text(data));
} // map() ends
} // Mapper ends
StageReducer
package lotus.staging;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class StageReducer extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
StageDriver
package lotus.staging;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class StageDriver {
// Main
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "StageDriver");
// conf.set("mapreduce.textoutputformat.separator", ",");
// conf.set("mapreduce.output.textoutputformat.separator", ",");
//conf.set("mapreduce.output.key.field.separator", ",");
job.setJarByClass(StageDriver.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// Mapper and Mapper-Output Key
job.setMapperClass(StageMapper.class);
job.setMapOutputKeyClass(Text.class);
conf.set("mapred.max.split.size", "1020");
// Reducer and Output Key and Value
job.setReducerClass(StageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Input parameters to execute
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// deleting the output path automatically from hdfs so that we don't
// have delete it explicitly
// outputPath.getFileSystem(conf).delete(outputPath);
// exiting the job only if the flag value becomes false
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Below are the datasets
Snapshot-Dataset Complete-Dataset
Please assist
Upvotes: 1
Views: 140
Reputation: 18693
One of the rows in the input.csv is incomplete or has a formatting error (improperly escaped). Try to figure out which row it is. You can catch an exception where this error occurs and print out the row number and fix your data.
try {
CODE WHERE THE OUTOFBOUNDS HAPPENS
}
catch (Exception e) {
LOG.warn(String.format("Invalid data in row: %d", row));
System.out.println(String.format("Invalid data in row: %d", row));
}
So in your case that means:
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] record = value.toString().split(",");
// Key
String stg_table = null;
try{
String report_code = record[0].trim();
String product_type_description = null;
String principal_amount = record[1];
String funded = record[2].trim();
String facility_id = record[3];
String loan_id = record[4];
// Start Date
String start_date = record[5];
// Maturity Date
String end_date = record[6];
DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;
try {
startDate = df.parse(start_date);
endDate = df.parse(end_date);
df.format(startDate);
df.format(endDate);
diff = endDate.getTime() - startDate.getTime();
diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
e.printStackTrace();
}
// Date Diff
String date_diff = String.valueOf(diffDays);
String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];
// Bank Number
String vehicle_code = record[14];
String vehicle_description = record[15];
// Branch Number
String cost_center_code = record[16];
String branch_borrower_name = record[17];
String igl_code = record[20];
// Participation Bal Begin Month
String participated_amt = record[21];
String sys_id = record[23];
// Loan To Value
String ltv = record[26];
String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];
}
catch (Exception e) {
if {record.size() > 0} {
// LOG.warn(String.format("Invalid data in row: %s", record[0].trim()));
System.out.println(String.format("Invalid data in record id: %s", record[0].trim()));}
else{
System.out.println("Empty Record Found");
}
return void;
}
...
I'm using the record id, because you don't have a row number, but then you can search your that for that record id. And presumably there is at least a first entry in your record. Otherwise you can also check to see if the record is empty.
Upvotes: 1