Reputation: 941
I would like to connect with hbase using spark. I am getting an exception. When i am trying to do same thing from scala, I am not getting such errors.
I am using scala 2.10.4, spark:- 1.3.0 CDH 5.4.0.
Code is:
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
import com.cloudera.spark.hbase.HBaseContext
object HBaseBulkPutExample {
def main(args: Array[String]) {
val tableName = "image_table";
val columnFamily = "Image Data"
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
)
)
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/eds/servers//hbase-1.0.1.1/conf/hbase-site.xml"));
val hbaseContext = new HBaseContext(sc, conf);
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
tableName,
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, putValue._3))
put
},
true);
}
}
When I am creating a jar and executing it I am getting following error:
org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoSuchMethodError: org.apache.hadoop.net.NetUtils.getInputStream(Ljava/net/Socket;)Lorg/apache/hadoop/net/SocketInputWrapper;
at org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupIOstreams(RpcClient.java:928)
at org.apache.hadoop.hbase.ipc.RpcClient.getConnection(RpcClient.java:1543)
at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1442)
at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1661)
at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1719)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:30304)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1562)
at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:711)
at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:709)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:715)
at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1140)
Upvotes: 3
Views: 8153
Reputation: 11
I got similar problem because of version mismatch issues in libraries amongst spark, hbase. I upgraded to spark 1.4, scala 2.11.6 and used hbase 1.0.1.1 - after that never got this problem. Jar version mismatch was causing this because spark jar was expecting an upgraded method in hbase client jar and failing.
Upvotes: 1
Reputation: 6279
I'm not sure if what the causes of your error. Look like calling Method mismatch with your environment version.
Here is a sample on How to connect with Hbase using spark:
import spark._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
...
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "image_table")
// Initialize
val admin = new HBaseAdmin(conf)
if(!admin.isTableAvailable(input_table)) {
val tableDesc = new HTableDescriptor("image_table")
admin.createTable(tableDesc)
}
val rdd = sc.newAPIHadoopRDD(
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
The Result class includes various methods for getting values
Upvotes: 0