Mandrek
Mandrek

Reputation: 1211

How to solve org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.Text

I am trying to analyze a retail store data where i want to solve the breakdown of sales by city ,Here is my data

Date       Time   City        Product-Cat      Sale-Value Payment-Mode
2012-01-01 09:20  Fort Worth  Women's Clothing 153.57     Visa
2012-01-01 09:00  San Jose    Mens Clothing    214.05     Rupee
2012-01-01 09:00  San Diego   Music            76.43      Amex
2012-01-01 09:00  New York    Cameras           45.76     Visa

Now i want to calculate sales break down by product category across all the stores

Here is the Mapper and reducer and the main class

public class RetailDataAnalysis {

public static class RetailDataAnalysisMapper extends Mapper<Text,Text,Text,Text>{

   // when trying with LongWritable Key 
    public void map(LongWritable key,Text Value,Context context) throws IOException, InterruptedException{

        String analyser [] = Value.toString().split(",");
        Text productCategory = new Text(analyser[3]);
        Text salesPrice = new Text(analyser[4]);
        context.write(productCategory, salesPrice);
    }

 // When trying with Text key

    public void map(Text key,Text Value,Context context) throws IOException, InterruptedException{

        String analyser [] = Value.toString().split(",");
        Text productCategory = new Text(analyser[3]);
        Text salesPrice = new Text(analyser[4]);
        context.write(productCategory, salesPrice);
    }


}


public static class RetailDataAnalysisReducer extends Reducer<Text,Text,Text,Text>{

    protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException{
        String csv ="";
        for(Text value:values){

            if(csv.length()>0){
                csv+= ",";
            }
            csv+=value.toString();
        }
        context.write(key, new Text(csv));
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf =  new  Configuration();
    String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    if(otherArgs.length<2){
        System.out.println("Usage Retail Data ");
        System.exit(2);
    }
    Job job= new Job(conf,"Retail Data Analysis");
    job.setJarByClass(RetailDataAnalysis.class);
    job.setMapperClass(RetailDataAnalysisMapper.class);
    job.setCombinerClass(RetailDataAnalysisReducer.class);
    job.setReducerClass(RetailDataAnalysisReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    for(int i=0;i<otherArgs.length-1;++i){
        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
    System.exit(job.waitForCompletion(true)?0:1);
  }
}

And the exception i am getting is when using LongWritable Key,

   18/04/11 09:15:40 INFO mapreduce.Job: Task Id : attempt_1523355254827_0008_m_000000_2, Status : FAILED
  Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)

Exception i am getting when trying to use Text key

   Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)

Please help me to solve this,i am very new to hadoop.

Upvotes: 0

Views: 2791

Answers (2)

Deepan Ram
Deepan Ram

Reputation: 850

When you read a file using Map Reduce, the file input format ( the default one ) reads an entire line and sends it to the mapper in the format of , so the input to the mapper becomes :-

public static class RetailDataAnalysisMapper extends Mapper<LongWritable,Text,Text,Text>

In case you need to read as

public static class RetailDataAnalysisMapper extends Mapper<Text,Text,Text,Text>

you would need to change the file input format and use your custom file input format along with the custom record reader. Then you need to add the following line in the driver code.

job.setInputFormatClass("your custom input format".class);

Hadoop understands everything in the form of so when you read a file, the offset becomes the LongWritable key and the value read becomes the value. So you need to use the default signature of Mapper<LongWritable,Text, <anything>,<anything> >

Upvotes: 0

Michal Lonski
Michal Lonski

Reputation: 887

You may need different input format class. By default used is TextInputFormat which split the file line by line and gives line number as LongWritable and the line as Text.

You can specify the input format class this way:

job.setInputFormatClass(TextInputFormat.class);

In your case, if you do not need the key, just the values, you can use LongWritable as key:

public static class RetailDataAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
        //...
    }
}

Edit:

Here is whole code after modyfing to use LongWritable as key:

public class RetailDataAnalysis {

    public static class RetailDataAnalysisMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text Value, Context context) throws IOException, InterruptedException {
            String analyser[] = Value.toString().split(",");
            Text productCategory = new Text(analyser[3]);
            Text salesPrice = new Text(analyser[4]);
            context.write(productCategory, salesPrice);
        }
    }

    public static class RetailDataAnalysisReducer extends Reducer<Text, Text, Text, Text> {

        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String csv = "";
            for (Text value : values) {
                if (csv.length() > 0) {
                    csv += ",";
                }
                csv += value.toString();
            }
            context.write(key, new Text(csv));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.out.println("Usage Retail Data ");
            System.exit(2);
        }
        Job job = new Job(conf, "Retail Data Analysis");
        job.setJarByClass(RetailDataAnalysis.class);
        job.setMapperClass(RetailDataAnalysisMapper.class);
        job.setCombinerClass(RetailDataAnalysisReducer.class);
        job.setReducerClass(RetailDataAnalysisReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Also if you are splitting the data by ,, your data should be a csv, like this:

2012-01-01 09:20,Fort Worth,Women's Clothing,153.57,Visa
2012-01-01 09:00,San Jose,Mens Clothing,214.05,Rupee
2012-01-01 09:00,San Diego,Music,76.43,Amex
2012-01-01 09:00,New York,Cameras,5.76,Visa

Not space separated as you specified it in your question.

Upvotes: 1

Related Questions