Reputation: 143
Learned so far, I think SparkSQL can create hive table,because of spark-sql is also improved based on shark,shark from hive.
Does spark-sql create other nosql table ? such as hbase,cassandra,elasticsearch.
I searched the relevant documents, did not find spark-sql create table api, Will it be supported in the future ?
Upvotes: 1
Views: 120
Reputation: 2072
creating indexes in Elasticsearch, tables in Hbase & Cassandra directly through spark-sql options are not possible at this moment.
you can use the hbase-client
option for natively interact with hbase. - https://mvnrepository.com/artifact/org.apache.hbase/hbase-client
1. creating a table in Hbase
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.conf.Configuration;
val admin = new HBaseAdmin(conf)
if (!admin.tableExists(myTable)) {
val htd = new HTableDescriptor(myTable)
htd.addFamily(new HColumnDescriptor("id"))
htd.addFamily(new HColumnDescriptor("name"))
htd.addFamily(new HColumnDescriptor("country))
htd.addFamily(new HColumnDescriptor("pincode"))
admin.createTable(htd)
}
2. Write to Hbase table that is created.
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor}
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "cmaster.localcloud.com:9092,cworker2.localcloud.com:9092,cworker1.localcloud.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val spark = SparkSession.builder().master("local[8]").appName("KafkaSparkHBasePipeline").getOrCreate()
spark.sparkContext.setLogLevel("OFF")
val result = kafkaStream.map(record => (record.key, record.value))
result.foreachRDD(x => {
val cols = x.map(x => x._2.split(","))
val arr =cols.map(x => {
val id = x(0)
val name = x(1)
val country = x(2)
val pincode = x(3)
(id,name,country,pincode)
})
arr.foreachPartition { iter =>
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "cmaster.localcloud.com")
conf.set("hbase.rootdir", "hdfs://localhost:8020/hbase")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
conf.set("hbase.unsafe.stream.capability.enforce", "false")
conf.set("hbase.cluster.distributed", "true")
val conn = ConnectionFactory.createConnection(conf)
import org.apache.hadoop.hbase.TableName
val tableName = "htd"
val table = TableName.valueOf(tableName)
val HbaseTable = conn.getTable(table)
val cfPersonal = "personal"
iter.foreach(x => {
val keyValue = "Key_" + x._1
val id = new Put(Bytes.toBytes(keyValue))
val name = x._2.toString
val country = x._3.toString
val pincode = x._4.toString
id.addColumn(Bytes.toBytes(cfPersonal), Bytes.toBytes("name"), Bytes.toBytes(name))
id.addColumn(Bytes.toBytes(cfPersonal), Bytes.toBytes("country"), Bytes.toBytes(country))
id.addColumn(Bytes.toBytes(cfPersonal), Bytes.toBytes("pincode"), Bytes.toBytes(pincode))
HbaseTable.put(id)
})
HbaseTable.close()
conn.close()
}
})
3. dependency I have used for this project,
name := "KafkaSparkHBasePipeline"
version := "0.1"
scalaVersion := "2.11.8"
resolvers += "Mavenrepository" at "https://mvnrepository.com"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "2.2.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.3"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.3"
4. to verify the data in hbase.
smart@cmaster sathyadev]$ hbase shell
Java HotSpot(TM) 64-Bit Server VM warning: Using incremental CMS is deprecated and will likely be removed in a future release HBase Shell Use "help" to get list of supported commands. Use "exit" to quit this interactive shell.
For Reference, please visit: http://hbase.apache.org/2.0/book.html
#shell Version 2.1.0-cdh6.2.1, rUnknown, Wed Sep 11 01:05:56 PDT 2019 Took 0.0064 seconds
hbase(main):001:0> scan 'htd';
ROW COLUMN+CELL Key_1010 column=personal:country, timestamp=1584125319078, value=USA Key_1010 column=personal:name, timestamp=1584125319078, value=Mark Key_1010 column=personal:pincode, timestamp=1584125319078, value=54321 Key_1011 column=personal:country, timestamp=1584125320073, value=CA
Upvotes: 1