Reputation: 4721
I have a the following pattern of outputs
public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>
public static class JoinSumReducer extends Reducer<Text, RecordWritable, Text, DoubleWritable>
I'm getting the following runtime exception java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
(full stack trace after code).
I tried the solution proposed by Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text but this leads to a runtime exception: java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
(full stack trace after code).
So clearly, there is a type mismatch somewhere, but I've followed all value definitions and I can't find what I'm missing. Is there another place that I need to define what types are being used?
The Writable Enum class
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/**
* Writable for enum Record
*/
public class RecordWritable implements Writable{
public static enum Record {BUY, CLICK};
private Record data;
public void set(Record data) {
this.data = data;
}
public Record get() {
return this.data;
}
public void readFields(DataInput dataInput) throws IOException {
data = WritableUtils.readEnum(dataInput, Record.class);
}
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeEnum(dataOutput,data);
}
}
The Mapper/Reducer and Main
import java.io.IOException;
import java.util.Scanner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SuccessRate {
/**
* Mapper
* - Key = ItemID
* - Value = The type of record is determined by number of columns
*/
public static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable>{
private Text itemID = new Text();
private RecordWritable record = new RecordWritable();
Pattern itemIDpattern = Pattern.compile("^(\\d+),");
Pattern columnPattern = Pattern.compile(",");
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
Scanner itr = new Scanner(value.toString());
while (itr.hasNextLine()) {
String line = itr.nextLine();
String id = null;
Matcher m = itemIDpattern.matcher(line);
if(m.find())
id = m.group(1);
RecordWritable.Record fileType;
int count = StringUtils.countMatches(line, ",");
if(count==4)
fileType = RecordWritable.Record.CLICK;
else
fileType = RecordWritable.Record.BUY;
if(id != null) {
itemID.set(id);
record.set(fileType);
context.write(itemID, record);
}
}
itr.close();
}
}
/**
* Reducer
* - Key : ItemID
* - Value : sum of buys / sum of clicks
*/
public static class JoinSumReducer
extends Reducer<Text, RecordWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<RecordWritable> values,
Context context
) throws IOException, InterruptedException {
int sumClick = 0;
int sumBuy = 0;
for (RecordWritable val : values) {
switch(val.get()) {
case CLICK:
sumClick += 1;
break;
case BUY:
sumBuy += 1;
break;
}
}
result.set((double)sumBuy/(double)sumClick);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "success rate");
job.setJarByClass(SuccessRate.class);
job.setMapperClass(RecordMapper.class);
job.setCombinerClass(JoinSumReducer.class);
job.setReducerClass(JoinSumReducer.class);
// job.setMapOutputKeyClass(Text.class); // I tried adding these two lines after reading https://stackoverflow.com/q/16926783/3303546
// job.setMapOutputValueClass(RecordWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Original error
java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1093)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:727)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at SuccessRate$RecordMapper.map(SuccessRate.java:54)
at SuccessRate$RecordMapper.map(SuccessRate.java:26)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I tried the solution proposed by Type mismatch in value from map: expected org.apache.hadoop.io.NullWritable, recieved org.apache.hadoop.io.Text but this leads to a runtime exception:
2018-09-24 11:36:04,423 INFO mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@5532c2f8
java.io.IOException: wrong value class: class org.apache.hadoop.io.DoubleWritable is not class RecordWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:194)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1562)
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1879)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:86)
at SuccessRate$JoinSumReducer.reduce(SuccessRate.java:66)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1900)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1662)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1505)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:735)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2076)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:809)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 1
Views: 290
Reputation: 1882
The workflow in MR is like this
mapper(inputkey 1,inputvalue1 ,outputkey1,outputvalue1) => combiner(outputkey1,outputvalue1 ,outputkey2,outputvalue2) => reducer (outputkey2,outputvalue2 ,outputkey3,outputvalue3)
each key & value data type must match
your error show Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.DoubleWritable, received RecordWritable
is caused by you have setCombiner job.setCombinerClass(JoinSumReducer.class)
the combiner output isn't match your reducer
you can remove this line and try again.
Upvotes: 1
Reputation: 1021
Firstly, you should uncomment those two line in your main method body. because:
Calling job.setOutputKeyClass( NullWritable.class ); will set the types expected as output from both the map and reduce phases.
If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer.
Secondly, The problem is that you did care about your combiner input and outputs.
Output types of a combiner must match output types of a mapper. Hadoop makes no guarantees on how many times the combiner is applied, or that it is even applied at all. And that's what happens in your case.
For more information it is recommended to follow: https://stackoverflow.com/a/14225304/2137378 and https://stackoverflow.com/a/30548793/2137378
Upvotes: 0