Pookly
Pookly

Reputation: 143

Can spark sql create nosql table?

Learned so far, I think SparkSQL can create hive table,because of spark-sql is also improved based on shark,shark from hive.

hive create table

Does spark-sql create other nosql table ? such as hbase,cassandra,elasticsearch.

I searched the relevant documents, did not find spark-sql create table api, Will it be supported in the future ?

Upvotes: 1

Views: 120

Answers (1)

sathya
sathya

Reputation: 2072

creating indexes in Elasticsearch, tables in Hbase & Cassandra directly through spark-sql options are not possible at this moment. you can use the hbase-client option for natively interact with hbase. - https://mvnrepository.com/artifact/org.apache.hbase/hbase-client

1. creating a table in Hbase

      import org.apache.hadoop.hbase.*;
      import org.apache.hadoop.conf.Configuration;
      val admin = new HBaseAdmin(conf)

      if (!admin.tableExists(myTable)) {
      val htd = new HTableDescriptor(myTable)
      htd.addFamily(new HColumnDescriptor("id"))
      htd.addFamily(new HColumnDescriptor("name"))
      htd.addFamily(new HColumnDescriptor("country))
      htd.addFamily(new HColumnDescriptor("pincode"))
      admin.createTable(htd)
}

2. Write to Hbase table that is created.

import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes

        val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "cmaster.localcloud.com:9092,cworker2.localcloud.com:9092,cworker1.localcloud.com:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "use_a_separate_group_id_for_each_stream",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)

        )

        val topics = Array("test")
        val spark = SparkSession.builder().master("local[8]").appName("KafkaSparkHBasePipeline").getOrCreate()
        spark.sparkContext.setLogLevel("OFF")
        val result = kafkaStream.map(record => (record.key, record.value))
        result.foreachRDD(x => {
            val cols = x.map(x => x._2.split(","))
            val arr =cols.map(x => {
                val id = x(0)
                val name = x(1)
                val country = x(2)
                val pincode = x(3)
                (id,name,country,pincode)
            })
            arr.foreachPartition { iter =>
                val conf = HBaseConfiguration.create()
                conf.set("hbase.zookeeper.quorum", "cmaster.localcloud.com")
                conf.set("hbase.rootdir", "hdfs://localhost:8020/hbase")
                conf.set("hbase.zookeeper.property.clientPort", "2181")
                conf.set("zookeeper.znode.parent", "/hbase")
                conf.set("hbase.unsafe.stream.capability.enforce", "false")
                conf.set("hbase.cluster.distributed", "true")
                val conn = ConnectionFactory.createConnection(conf)
                import org.apache.hadoop.hbase.TableName
                val tableName = "htd"
                val table = TableName.valueOf(tableName)
                val HbaseTable = conn.getTable(table)
                val cfPersonal = "personal"
                iter.foreach(x => {
                    val keyValue = "Key_" + x._1
                    val id = new Put(Bytes.toBytes(keyValue))
                    val name = x._2.toString
                    val country = x._3.toString
                    val pincode = x._4.toString
                    id.addColumn(Bytes.toBytes(cfPersonal), Bytes.toBytes("name"), Bytes.toBytes(name))
                    id.addColumn(Bytes.toBytes(cfPersonal), Bytes.toBytes("country"), Bytes.toBytes(country))
                    id.addColumn(Bytes.toBytes(cfPersonal), Bytes.toBytes("pincode"), Bytes.toBytes(pincode))
                    HbaseTable.put(id)
                })
                HbaseTable.close()
                conn.close()
            }
        })

3. dependency I have used for this project,

name := "KafkaSparkHBasePipeline"
version := "0.1"
scalaVersion := "2.11.8"
resolvers += "Mavenrepository" at "https://mvnrepository.com"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "2.2.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.3"

4. to verify the data in hbase.

smart@cmaster sathyadev]$ hbase shell 
Java HotSpot(TM) 64-Bit Server VM warning: Using incremental CMS is deprecated and will likely be removed in a future release HBase Shell Use "help" to get list of supported commands. Use "exit" to quit this interactive shell. 
For Reference, please visit: http://hbase.apache.org/2.0/book.html
#shell Version 2.1.0-cdh6.2.1, rUnknown, Wed Sep 11 01:05:56 PDT 2019 Took 0.0064 seconds 
hbase(main):001:0> scan 'htd'; 
ROW COLUMN+CELL Key_1010 column=personal:country, timestamp=1584125319078, value=USA Key_1010 column=personal:name, timestamp=1584125319078, value=Mark Key_1010 column=personal:pincode, timestamp=1584125319078, value=54321 Key_1011 column=personal:country, timestamp=1584125320073, value=CA

Upvotes: 1

Related Questions