Shrey_Shrma
Shrey_Shrma

Reputation: 11

Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.IntWritable

I am trying to run a map reducer in java for a comma separated file containing data about an AIRLINE which crashed.

The data contains the following columns and a sample data is also provided :

passengerid,survived(s=0,d=1),pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked
1,0,3,"Braund Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S,2,1,1,"Cumings Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C,
3,1,3,"Heikkinen Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S,
4,1,1,"Futrelle Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S,
5,0,3,"Allen Mr. William Henry",male,35,0,0,373450,8.05,,S,
6,0,3,"Moran Mr. James",male,,0,0,330877,8.4583,,Q,
7,0,1,"McCarthy Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S,
8,0,3,"Palsson Master. Gosta Leonard",male,2,3,1,349909,21.075,,S,
9,1,3,"Johnson Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S,
10,1,2,"Nasser Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C,
11,1,3,"Sandstrom Miss. Marguerite Rut",female,4,1,1,PP 9549,16.7,G6,S,
12,1,1,"Bonnell Miss. Elizabeth",female,58,0,0,113783,26.55,C103,S,  
...

My objective is to find the average age of people died in this crash. Here's my code snippets and the errors i encountered:

Airline.airlineDriver.java:

package Airline;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import Airline.airlineMapper;
import Airline.airlineReducer;


public class airlineDriver {

public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException, URISyntaxException {
    // TODO Auto-generated method stub
    Configuration conf = new Configuration();

    Job j = Job.getInstance(conf);// getConf()
    j.setJobName("Airline Job");
    j.setJarByClass(airlineDriver.class );
    j.setMapperClass(airlineMapper.class );
    j.setNumReduceTasks(2);
    j.setReducerClass(airlineReducer.class);
    j.setMapOutputKeyClass(IntWritable.class);
    j.setMapOutputValueClass(Text.class);
    j.setOutputKeyClass(Text.class);
    j.setOutputValueClass(FloatWritable.class);

    FileInputFormat.addInputPath(j, new Path(args[0]));
    FileOutputFormat.setOutputPath(j, new Path(args[1]));


    System.exit(j.waitForCompletion(true) ? 0 : 1);
   }

  }

Airline.airlineMapper.java:

package Airline;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class airlineMapper extends Mapper<LongWritable,  Text, 
IntWritable,Text> {
@Override
protected void map(LongWritable key, Text 
value,org.apache.hadoop.mapreduce.Mapper.Context context)throws IOException, 
InterruptedException {
    String inputstring = value.toString(); //converts input Text value to String
    IntWritable resKEY = new IntWritable();
    Text resVALUE = new Text();
    String str[] = inputstring.split(","); //splits it into array
    int bool = Integer.parseInt(str[1]); //fetch survived(s=0) or dead(d=1)
    if (bool == 1){
        resVALUE.set(str[5]);
        resKEY.set(bool);
        context.write(resKEY,resVALUE); //write key value pair to partitioner and reducer
}  }
}

Airline.airlineReducer.java :

package Airline;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineReducer extends Reducer<IntWritable,Text, Text, 
FloatWritable> {

@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException { 
    float y = 0;  //numerator for avgage
    float avgage =0;  
    int counter = 0;  //denominator for avgage 
    String a = "Average age";
    for(Text x : values)  //passes value to x one-by-one
    {
        String z = x.toString();   //converts text to string
        if((!z.equals(""))&&(!z.equals(null))){  //eliminating any empty string (**possible source of error)
        y +=Float.parseFloat(z);  //converting age to float since csf contains floating point ages
        counter++; //incrementing counter for total no. of people(records)
    
    
    }else{continue;  //if a null or empty string is encountered loop may skip the iteration and continue to next record
    //in case few fields are left empty 
    }//**possible source of error
        }
    avgage = y/counter;    //average formula
    context.write(new Text(a), new FloatWritable(avgage) ); //wirting output data
    
    }
    }

Even on making repeated changes in code i am still getting errors like:

Error: java.lang.ArrayIndexOutOfBoundsException: 1
at Airline.airlineMapper.map(airlineMapper.java:18)
at Airline.airlineMapper.map(airlineMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs
(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

plz help

Upvotes: 0

Views: 374

Answers (1)

Shrey_Shrma
Shrey_Shrma

Reputation: 11

As @RameshMaharjan answered The correct mapper and reducer class would be:

Airline.airlineMapper (THE CODE WHICH I MENTIONED BEFORE IS ALSO COMMENTED BELOW. THESE CHECKS str.length==12 and str[5].matches("\\d+") CAN BE USED WITH THE PREVIOUS CODE AND IT MUST WORK JUST FINE) :

package Airline;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineMapper extends   
Mapper<LongWritable,Text,Text,IntWritable> {
Text gender = new Text();
IntWritable age = new IntWritable();


protected void map(LongWritable key,Text value,Context context)throws 
IOException, InterruptedException {
    String inputstring = value.toString();
       String str[]=inputstring.split(",");
             if(str.length==12){
                 gender.set(str[4]);
             if(str[1].equals("0") ){
               if(str[5].matches("\\d+")){
                  int i=Integer.parseInt(str[5]);
                    age.set(i);
        }
       }
     }
        context.write(gender, age);

     //String inputstring = value.toString();
     // String[] str = inputstring.split(",");
     // IntWritable resKEY = new IntWritable();
     // Text resVALUE = new Text();
     // 
     // int bool = Integer.parseInt(str[1]);
     // if (bool == 1 && str[5].length() >= 1){
     //     resVALUE.set(str[5]);
     //     resKEY.set(bool);
     // context.write(resKEY,resVALUE);
     // }
      }}

Airline.airlineReducer (THE IMPORTANT THING TO NOTICE IN THE PREVIOUS MAP-REDUCE CODE IS, I TRIED TO CHECK THE STRING IN REDUCER PHASE, WHICH SHOULD BE DONE IN THE MAPPER PHASE ITSELF. as @RAMESHMAHARJAN pointed out before) :

package Airline;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineReducer extends Reducer<Text,IntWritable, Text, 
IntWritable> {

@Override
public void reduce( Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException { 
    int sum = 0;
     int total_people=0;
     for (IntWritable val : values) {
     total_people+=1;
     sum += val.get();
     }
     sum=sum/total_people;
     context.write(key, new IntWritable(sum));


    //float y = 0;
    //float avgage =0;
    //int counter = 0;
    //String a = "Average age";
    //Text resKEY = new Text();
    //FloatWritable resVALUE = new FloatWritable();
    //  for(Text x : values)
    //  {
    //      String z = x.toString();
    //      
    //      y +=Float.parseFloat(z);
    //      counter++;
    //  
    //  
    //  }   
    //  avgage = y/counter;
    //  resKEY.set(a);
    //  resVALUE.set(avgage);
    //  context.write(resKEY, resVALUE );

}
}

Upvotes: 1

Related Questions