Reputation: 772
There is a need to read the stream of structured data from Kafka stream and write it to the already existing Hive table. Upon analysis, it appears that one of the options is to do readStream of Kafka source and then do writeStream to a File sink in HDFS file path.
My question here is- is it possible to directly write to a Hive table? Or, Is there a workaround approach that can be followed for this use-case?
EDIT1:
.foreachBatch - seems to be working but it is having the issue mentioned below
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SaveMode
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
//subscribe to kafka topic
val csvDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "xxxxxx912:9092").option("subscribe", "testtest").load()
val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]
val query = abcd.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {batchDs.write.mode(SaveMode.Append).insertInto("default.6columns");}).option("quote", "\u0000").start()
hive> select * from 6columns;
OK
0 A3,L1,G1,P1,O1,101,TXN1 testtest 122 0 2019-05-23 12:38:49.515
0 A3,L1,G1,P1,O1,102,TXN2 testtest 123 0 2019-05-23 12:38:49.524
0 A1,L1,G1,P1,O1,100,TXN3 testtest 124 0 2019-05-23 12:38:49.524
0 A2,L2,G1,P1,O2,100,TXN4 testtest 125 0 2019-05-23 12:38:49.524
0 A3,L1,G1,P1,O1,103,TXN5 testtest 126 0 2019-05-23 12:38:54.525
0 A3,L1,G1,P1,O1,104,TXN6 testtest 127 0 2019-05-23 12:38:55.525
0 A4,L1,G1,P1,O1,100,TXN7 testtest 128 0 2019-05-23 12:38:56.526
0 A1,L1,G1,P1,O1,500,TXNID8 testtest 129 0 2019-05-23 12:38:57.526
0 A6,L2,G2,P1,O1,500,TXNID9 testtest 130 0 2019-05-23 12:38:57.526
What I am looking for is to split the value attribute of Kafka message so that the data resembles Hive table it kind of becomes a 12 column table (A3,L1,G1,P1,O1,101,TXN1 - splits into 7 attributes). Need some additional trasformation similar to .option("quote", "\u0000") that I have done while writing the dataframe. But doesn't seem to be working.
Upvotes: 1
Views: 2517
Reputation: 1330
Once you have your stream set up and consuming from kafka you can use the forEachBatch
function like so.
val yourStream = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.load()
val query = yourStream.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {
batchDs
.write
.mode(SaveMode.Append)
.insertInto("your_db.your_table");
}).start()
query.awaitTermination()
to get the String split by ,
into separate columns you can use the split
function to get all the items delimited by a ,
into an array and then you can select the items individually by index e.g "SPLIT(CAST(value AS STRING), ',')[0]"
to get the first element.
So replace
val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]
with
val abcd = csvDF.selectExpr("CAST(key AS STRING)", "SPLIT(CAST(value AS STRING), ',')[0]", "SPLIT(CAST(value AS STRING), ',')[1]", "SPLIT(CAST(value AS STRING), ',')[2]", "SPLIT(CAST(value AS STRING), ',')[3]", "SPLIT(CAST(value AS STRING), ',')[4]", "SPLIT(CAST(value AS STRING), ',')[5]", "SPLIT(CAST(value AS STRING), ',')[6]", "CAST(topic AS STRING)", "CAST(offset AS STRING)", "CAST(partition AS STRING)", "CAST(timestamp AS STRING)").as[(String, String, String, String, String, String, String, String, String, String, String)]
Upvotes: 2