Reputation: 1953
I am programming MapR with Avro, and a real beginner against Avro. The input and output are both avro format with specific schema.
Here is my mapper and reducer using mapreduce API of MR1:
public class UserClassifyMapReduce extends Configured implements Tool {
private final static Logger logger = LoggerFactory.getLogger(UserClassifyMapReduce.class);
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new UserClassifyMapReduce(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
logger.error("Usage: UserClassify <intputfile> <outputfolder>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(getConf());
job.setJobName("UserClassify");
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.setJarByClass(DataSerializeMapReduce.class);
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{
@Override
protected void map(AvroKey<NetflowRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException{
CharSequence devMac = key.datum().getDevMacAddr();
context.write(new AvroKey<CharSequence>(devMac), new AvroValue<NetflowRecord>(key.datum()));
}
}
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{
@Override
protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<NetflowRecord>> values, Context context)
throws IOException, InterruptedException{
(...code)
}
}
}
The CastError throws messages like
java.lang.Exception: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
at java.lang.Class.asSubclass(Class.java:3116)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
A very simple program. Do u have any idea about this problem. Thanks alot.
Jamin
Upvotes: 1
Views: 2546
Reputation: 2736
You appear to be lacking a schema for the mapper output key AvroKey<CharSequence>
. Adding the corresponding schema should be sufficient:
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
Upvotes: 1