Cecilia
Cecilia

Reputation: 4721

Type mismatch between Mapper and Reducer

I have a the following pattern of outputs

public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>

public static class JoinSumReducer extends Reducer<Text, RecordWritable, Text, DoubleWritable>

I'm getting the following runtime exception java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable (full stack trace after code).

I tried the solution proposed by Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text but this leads to a runtime exception: java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable (full stack trace after code).

So clearly, there is a type mismatch somewhere, but I've followed all value definitions and I can't find what I'm missing. Is there another place that I need to define what types are being used?

Here's my code

The Writable Enum class

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;

/**
 * Writable for enum Record
 */
public class RecordWritable implements Writable{
    public static enum Record {BUY, CLICK};
    private Record data;

    public void set(Record data) {
        this.data = data;
    }

    public Record get() {
        return this.data;
    }

    public void readFields(DataInput dataInput) throws IOException {
        data = WritableUtils.readEnum(dataInput, Record.class); 
    }

    public void write(DataOutput dataOutput) throws IOException {
        WritableUtils.writeEnum(dataOutput,data);
    }
}

The Mapper/Reducer and Main

import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SuccessRate {

    /**
     * Mapper
     * - Key = ItemID
     * - Value = The type of record is determined by number of columns
     */
    public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>{

        private Text itemID = new Text();
        private RecordWritable record = new RecordWritable();
        Pattern itemIDpattern = Pattern.compile("^(\\d+),");
        Pattern columnPattern = Pattern.compile(",");

        public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            Scanner itr = new Scanner(value.toString());
            while (itr.hasNextLine()) {
                String line = itr.nextLine();

                String id = null;
                Matcher m = itemIDpattern.matcher(line);
                if(m.find())
                    id = m.group(1);

                RecordWritable.Record fileType;
                int count = StringUtils.countMatches(line, ",");
                if(count==4)
                    fileType = RecordWritable.Record.CLICK;
                else
                    fileType = RecordWritable.Record.BUY;

                if(id != null) {
                    itemID.set(id);
                    record.set(fileType);
                    context.write(itemID, record);
                }
            }
            itr.close();
        }
    }

    /**
     * Reducer
     * - Key : ItemID
     * - Value : sum of buys / sum of clicks
     */
    public static class JoinSumReducer
    extends Reducer<Text, RecordWritable, Text, DoubleWritable> {
        private DoubleWritable result = new DoubleWritable();

        public void reduce(Text key, Iterable<RecordWritable> values,
                Context context
                ) throws IOException, InterruptedException {
            int sumClick = 0;
            int sumBuy = 0;
            for (RecordWritable val : values) {
                switch(val.get()) {
                case CLICK:
                    sumClick += 1;
                    break;
                case BUY:
                    sumBuy += 1;
                    break;
                }
            }
            result.set((double)sumBuy/(double)sumClick);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {       
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "success rate");
        job.setJarByClass(SuccessRate.class);
        job.setMapperClass(RecordMapper.class);
        job.setCombinerClass(JoinSumReducer.class);
        job.setReducerClass(JoinSumReducer.class);
//      job.setMapOutputKeyClass(Text.class); // I tried adding these two lines after reading https://stackoverflow.com/q/16926783/3303546
//      job.setMapOutputValueClass(RecordWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Full exception stack traces

Original error

java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1093)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:727)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
    at SuccessRate$RecordMapper.map(SuccessRate.java:54)
    at SuccessRate$RecordMapper.map(SuccessRate.java:26)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I tried the solution proposed by Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text but this leads to a runtime exception:

2018-09-24 11:36:04,423 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@5532c2f8
java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
    at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
    at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1562)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1879)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:86)
    at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:66)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1900)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1662)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1505)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:735)
    at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2076)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:809)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Upvotes: 1

Views: 290

Answers (2)

HbnKing
HbnKing

Reputation: 1882

The workflow in MR is like this

mapper(inputkey 1,inputvalue1 ,outputkey1,outputvalue1) => combiner(outputkey1,outputvalue1 ,outputkey2,outputvalue2) => reducer (outputkey2,outputvalue2 ,outputkey3,outputvalue3)

each key & value data type must match

your error show Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable

is caused by you have setCombiner job.setCombinerClass(JoinSumReducer.class)

the combiner output isn't match your reducer
you can remove this line and try again.

Upvotes: 1

Amin Heydari Alashti
Amin Heydari Alashti

Reputation: 1021

Firstly, you should uncomment those two line in your main method body. because:

Calling job.setOutputKeyClass( NullWritable.class ); will set the types expected as output from both the map and reduce phases.

If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer.

Secondly, The problem is that you did care about your combiner input and outputs.

Output types of a combiner must match output types of a mapper. Hadoop makes no guarantees on how many times the combiner is applied, or that it is even applied at all. And that's what happens in your case.

For more information it is recommended to follow: https://stackoverflow.com/a/14225304/2137378 and https://stackoverflow.com/a/30548793/2137378

Upvotes: 0

Related Questions