ayyappa maddi
ayyappa maddi

Reputation: 941

How to connect with Hbase using spark

I would like to connect with hbase using spark. I am getting an exception. When i am trying to do same thing from scala, I am not getting such errors.

I am using scala 2.10.4, spark:- 1.3.0 CDH 5.4.0.

Code is:

import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.spark.SparkConf
import com.cloudera.spark.hbase.HBaseContext

object HBaseBulkPutExample {
    def main(args: Array[String]) {
        val tableName = "image_table";
        val columnFamily = "Image Data"

        val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily)
        val sc = new SparkContext(sparkConf)

        val rdd = sc.parallelize(Array(
            (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
            (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
            (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
            (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
            (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
            )
        )

        val conf = HBaseConfiguration.create();
        conf.addResource(new Path("/eds/servers//hbase-1.0.1.1/conf/hbase-site.xml"));

        val hbaseContext = new HBaseContext(sc, conf);
        hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
           tableName,
           (putRecord) => {
               val put = new Put(putRecord._1)
               putRecord._2.foreach((putValue) => put.add(putValue._1, putValue._2, putValue._3))
               put
            },
            true);
    }
}

When I am creating a jar and executing it I am getting following error:

org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoSuchMethodError: org.apache.hadoop.net.NetUtils.getInputStream(Ljava/net/Socket;)Lorg/apache/hadoop/net/SocketInputWrapper;
    at org.apache.hadoop.hbase.ipc.RpcClient$Connection.setupIOstreams(RpcClient.java:928)
    at org.apache.hadoop.hbase.ipc.RpcClient.getConnection(RpcClient.java:1543)
    at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1442)
    at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1661)
    at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1719)
    at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:30304)
    at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1562)
    at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:711)
    at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:709)
    at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
    at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:715)
    at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1140)

Upvotes: 3

Views: 8153

Answers (2)

Rana Roy
Rana Roy

Reputation: 11

I got similar problem because of version mismatch issues in libraries amongst spark, hbase. I upgraded to spark 1.4, scala 2.11.6 and used hbase 1.0.1.1 - after that never got this problem. Jar version mismatch was causing this because spark jar was expecting an upgraded method in hbase client jar and failing.

Upvotes: 1

AzizSM
AzizSM

Reputation: 6279

I'm not sure if what the causes of your error. Look like calling Method mismatch with your environment version.

Here is a sample on How to connect with Hbase using spark:

import spark._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

...

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "image_table")

 // Initialize 
val admin = new HBaseAdmin(conf)
if(!admin.isTableAvailable(input_table)) {
  val tableDesc = new HTableDescriptor("image_table")
  admin.createTable(tableDesc)
}

val rdd = sc.newAPIHadoopRDD(
    conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

The Result class includes various methods for getting values

Upvotes: 0

Related Questions