We are doing streaming on kafka data which being collected from MySQL. Now once all the analytics has been done i want to save my data directly to Hbase. I have through the spark structured streaming document but couldn't find any sink with Hbase. Code which I used to read the data from Kafka is below.
val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
val uschema = StructType(Seq(
StructField("MeterNumber", StringType, true),
StructField("Utility", StringType, true),
StructField("VendorServiceNumber", StringType, true),
StructField("VendorName", StringType, true),
StructField("SiteNumber", StringType, true),
StructField("SiteName", StringType, true),
StructField("Location", StringType, true),
StructField("timestamp", LongType, true),
StructField("power", DoubleType, true)
val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")
Now finally, I want to save DF_Hbase dataframe in hbase.
1- add these libraries to your project :
"org.apache.hbase" % "hbase-client" % "2.0.1"
"org.apache.hbase" % "hbase-common" % "2.0.1"
2- add this trait to your code :
import java.util.concurrent.ExecutorService
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.ForeachWriter
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
val hbaseConfResources: Seq[String]
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
def createConnection(): Connection = {
val hbaseConfig = HBaseConfiguration.create()
ConnectionFactory.createConnection(hbaseConfig, pool.orNull, user.orNull)
def getHTable(connection: Connection): Table = {
override def process(record: RECORD): Unit = {
val put = toPut(record)
override def close(errorOrNull: Throwable): Unit = {
def toPut(record: RECORD): Put
3- use it for your logic :
val ds = .... //anyDataset[WhatEverYourDataType]
val query = ds.writeStream
.foreach(new HBaseForeachWriter[WhatEverYourDataType] {
override val tableName: String = "hbase-table-name"
//your cluster files, i assume here it is in resources
override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml")
override def toPut(record: WhatEverYourDataType): Put = {
val key = .....
val columnFamaliyName : String = ....
val columnName : String = ....
val columnValue = ....
val p = new Put(Bytes.toBytes(key))
//Add columns ...
This method worked for me even using pyspark:
package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._
class HBaseSink(options: Map[String, String]) extends Sink with Logging {
// String with HBaseTableCatalog.tableCatalog
private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
HBaseTableCatalog.newTable -> "5"))
class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new HBaseSink(parameters)
def shortName(): String = "hbase"
I added the file named as HBaseSinkProvider.scala to shc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase
and built it, the example works perfect
This is example, how to use (scala):
queryName("hbase writer").
option("checkpointLocation", checkPointProdPath).
option("hbasecat", catalog).
And an example of how i use it in python:
inputDF \
.writeStream \
.outputMode("append") \
.format('HBase.HBaseSinkProvider') \
.option('hbasecat', catalog_kafka) \
.option("checkpointLocation", '/tmp/checkpoint') \
Are you processing the data coming from Kafka? Or just pumping it to HBase? An option to consider is using Kafka Connect. This gives you a configuration-file based approach for integrating Kafka with other systems, including HBase.
