Reputation: 18611
I am trying to output to a HBase table directly from my Mapper while using Hadoop2.4.0 with HBase0.94.18 on EMR.
I am getting a nasty IOException: Pass a Delete or a Put
when executing the code below.
public class TestHBase {
static class ImportMapper
extends Mapper<MyKey, MyValue, ImmutableBytesWritable, Writable> {
private byte[] family = Bytes.toBytes("f");
@Override
public void map(MyKey key, MyValue value, Context context) {
MyItem item = //do some stuff with key/value and create item
byte[] rowKey = Bytes.toBytes(item.getKey());
Put put = new Put(rowKey);
for (String attr : Arrays.asList("a1", "a2", "a3")) {
byte[] qualifier = Bytes.toBytes(attr);
put.add(family, qualifier, Bytes.toBytes(item.get(attr)));
}
context.write(new ImmutableBytesWritable(rowKey), put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String input = args[0];
String table = "table";
Job job = Job.getInstance(conf, "stuff");
job.setJarByClass(ImportMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.addInputPath(job, new Path(input));
TableMapReduceUtil.initTableReducerJob(
table, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Does anyone know what I am doing wrong?
Stacktrace
Error: java.io.IOException: Pass a Delete or a Put at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:646) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:775) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143
Upvotes: 2
Views: 996
Reputation: 7344
Thanks for adding the stack trace. Unfortunately you didn't include the code that threw the exception so I can't fully trace it for you. Instead I did a little searching around and discovered a few things for you.
Your stack trace is similar to one in another SO question here: Pass a Delete or a Put error in hbase mapreduce
That one solved the issue by commenting out job.setNumReduceTasks(0);
There is a similar SO question that had the same exception but couldn't solve the problem that way. Instead it was having a problem with annotations:
"java.io.IOException: Pass a Delete or a Put" when reading HDFS and storing HBase
Here are some good examples of how to write working code both with setNumReduceTasks at 0 and at 1 or more.
"51.2. HBase MapReduce Read/Write Example The following is an example of using HBase both as a source and as a sink with MapReduce. This example will simply copy data from one table to another.
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
This is the 1 or more example:
"51.4. HBase MapReduce Summary to HBase Example The following example uses HBase as a MapReduce source and sink with a summarization step. This example will count the number of distinct instances of a value in a table and write those summarized counts in another table.
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
http://hbase.apache.org/book.html#mapreduce.example
You seem to be more closely following the first example. I wanted to show that sometimes there is a reason to set the number of reduce tasks to zero.
Upvotes: 0
Reputation: 6500
It would be better if you can show the full stack trace, so that i can help you solve it easily. I've not executed your code. As far as i've seen your code, this could be the issue
job.setNumReduceTasks(0);
Mapper will be expecting your put
object to write directly to Apache HBase.
You can increase the setNumReduceTasks
OR If you see the API you can find its default value and comment it.
Upvotes: 2