Reputation: 3576
I'm trying to read data out of HBase and save it as a sequenceFile, but getting
java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
error.
I saw two similar posts:
hadoop writables NotSerializableException with Apache Spark API
and
Spark HBase Join Error: object not serializable class: org.apache.hadoop.hbase.client.Result
Following those two posts, I registered Kyro classes with three classes, but still no luck.
Here's my program:
String tableName = "validatorTableSample";
System.out.println("Start indexing hbase: " + tableName);
SparkConf sparkConf = new SparkConf().setAppName("HBaseRead");
Class[] classes = {org.apache.hadoop.io.LongWritable.class, org.apache.hadoop.io.Text.class, org.apache.hadoop.hbase.client.Result.class};
sparkConf.registerKryoClasses(classes);
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, tableName);
// conf.setStrings("io.serializations",
// conf.get("io.serializations"),
// MutationSerialization.class.getName(),
// ResultSerialization.class.getName());
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaPairRDD<ImmutableBytesWritable, Result> hBasePairRDD = sc.newAPIHadoopRDD(
conf,
TableInputFormat.class,
ImmutableBytesWritable.class,
Result.class);
hBasePairRDD.saveAsNewAPIHadoopFile("/tmp/tempOutputPath", ImmutableBytesWritable.class, Result.class, SequenceFileOutputFormat.class);
System.out.println("Finished readFromHbaseAndSaveAsSequenceFile() .........");
Here's the error stacktrace:
java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1254)
at org.apache.hadoop.io.SequenceFile$Writer.<init>(SequenceFile.java:1156)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530)
at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getSequenceWriter(SequenceFileOutputFormat.java:64)
at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:75)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1112)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/05/25 10:58:38 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Could not find a serializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're usingcustom serialization.
at org.apache.hadoop.io.SequenceFile$Writer.init(SequenceFile.java:1254)
at org.apache.hadoop.io.SequenceFile$Writer.<init>(SequenceFile.java:1156)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:273)
at org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:530)
at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getSequenceWriter(SequenceFileOutputFormat.java:64)
at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:75)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1112)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/05/25 10:58:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Upvotes: 1
Views: 2271
Reputation: 35
the code already pass the test
object HbaseDataExport extends LoggingTime{
def main(args: Array[String]): Unit = {
val con = SparkConfig.getProperties()
val sparkConf = SparkConfig.getSparkConf()
val sc = SparkContext.getOrCreate(sparkConf)
val config = HBaseConfiguration.create()
config.setStrings("io.serializations",
config.get("io.serializations"),
"org.apache.hadoop.hbase.mapreduce.MutationSerialization",
"org.apache.hadoop.hbase.mapreduce.ResultSerialization")
val path = "/Users/jhTian/Desktop/hbaseTimeData/part-m-00030"
val path1 = "hdfs://localhost:9000/hbaseTimeData/part-m-00030"
sc.newAPIHadoopFile(path1, classOf[SequenceFileInputFormat[Text, Result]], classOf[Text], classOf[Result], config).foreach(x => {
import collection.JavaConversions._
for (i <- x._2.listCells) {
logger.info(s"family:${Bytes.toString(CellUtil.cloneFamily(i))},qualifier:${Bytes.toString(CellUtil.cloneQualifier(i))},value:${Bytes.toString(CellUtil.cloneValue(i))}")
}
})
sc.stop()
}
}
Upvotes: 0
Reputation: 1585
Here is what was needed to make it work
Because we use HBase to store our data and this reducer outputs its result to HBase table, Hadoop is telling us that he doesn’t know how to serialize our data. That is why we need to help it. Inside setUp set the io.serializations variable
conf.setStrings("io.serializations", new String[]{hbaseConf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});
Upvotes: 1