Michele
Michele

Reputation: 1488

How to configure Spark Streaming Scala app to read from HBase on Hadoop + Yarn

Spark, Hbase on Hadoop + Yarn, I want to read and write HBase from a Scala App, built with SBT.

I cannot create an HBase

Scala APP:

/usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala



package com.mydomain.spark.hbasewordcount

import org.apache.spark._
import org.apache.spark.streaming._

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf

object HBaseScalaWordCount {
    def main(args: Array[String]) {

        val name = "Example of read from HBase table"

        lazy val sparkConf = new SparkConf().setAppName(name)
        lazy val ssc = new StreamingContext(sparkConf, Seconds(1))
        implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath

        val columns = Map(
            "cf1" -> Set("col1", "col2"),
            "cf2" -> Set("col3")
        )

        ssc.hbase[String]("testtable", columns)
        .map({ case (k, v) =>
          val cf1 = v("cf1")
          val col1 = cf1("col1")
          val col2 = cf1("col2")
          val col3 = v("cf2")("col3")

          List(k, col1, col2, col3) mkString "\t"
        })
        .saveAsTextFile("file:/home/hduser/hbasetest-output")
    }
}

SBT FILE:

/usr/local/sparkapps/HBaseWordCount/HBaseWordCount.sbt


name := "HBaseScalaWordCount"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.apache.hbase" % "hbase-common" % "1.2.1" % "provided",
  "org.apache.hbase" % "hbase-client" % "1.2.1" % "provided",
  "org.apache.hbase" % "hbase-server" % "1.2.1" % "provided",
  "eu.unicredit" %% "hbase-rdd" % "0.7.1"
)

SBT PACKAGE

/usr/local/sparkapps/HBaseWordCount$ sbt package


[info] Set current project to HBaseScalaWordCount (in build         file:/usr/local/sparkapps/HBaseWordCount/)
[info] Compiling 1 Scala source to /usr/local/sparkapps/HBaseWordCount/target/scala-2.10/classes...
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:29: not found: value HBaseConfig
[error]         implicit val config = HBaseConfig() // Assumes hbase-site.xml is on classpath
[error]                               ^
[error] /usr/local/sparkapps/HBaseWordCount/src/main/scala/com/mydomain/spark/hbasewordcount/HbaseWordCount.scala:36: value hbase is not a member of org.apache.spark.streaming.StreamingContext
[error]         ssc.hbase[String]("testtable", columns)
[error]             ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 9 s, completed Apr 14, 2016 4:11:40 PM    

HBase is working correctly over Hadoop but i cannot understand how to configure the classpath for Spark, for example in /usr/local/spark/conf/spark-deafaults.conf which actually dosen't exist, i have only spark-deafaults.conf.template

SPARK-ENV.SH:

/usr/local/spark/conf/spark-env.sh

export SPARK_MASTER_IP=localhost
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=800m
export SPARK_WORKER_INSTANCES=1

SPARK-DEFAULTS.CONF:

doesn't exist

HBASE PATH:

/usr/local/hbase/hbase-1.1.3/lib/

HBASE_SITE.XML:

/usr/local/hbase/hbase-1.1.3/conf/hbase-site.xml 


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


<configuration>

  <property>
    <name>hbase.rootdir</name>
    <value>hdfs://localhost:9000/hbase</value>
  </property>

  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>

  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>localhost</value>
  </property>

  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>

  <property>
    <name>hbase.zookeeper.property.clientPort</name>
    <value>2181</value>
  </property>

  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hduser/hbase/zookeeper</value>
  </property>

</configuration>

Upvotes: 0

Views: 981

Answers (1)

Daniel Zolnai
Daniel Zolnai

Reputation: 16910

First of all, SBT can't find the class HBaseConf. This is because you have imported org.apache.hadoop.hbase.HBaseConfiguration, but the class you need is unicredit.spark.hbase.HBaseConf.

Your second problem is

value hbase is not a member of org.apache.spark.streaming.StreamingContextvalue hbase is not a member of org.apache.spark.streaming.StreamingContext

Which means that SBT can't find an hbase method on the StreamingContext. I see you are using hbase-rdd to add HBase support to Spark. If you check the README of that project, you have to add the import line for it's implicits, so add this to the top of your class:

import unicredit.spark.hbase._

Implicits are a nice addition to Scala, which can extend the functionality of classes of other packages. With the imported implicits, the hbase method should be available on your SparkContext instance.

Note that you don't have a SparkContext instance yet, only a StreamingContext, so first create one. Also it is unnecessary to make them lazy.

Upvotes: 1

Related Questions