Prashant
Prashant

Reputation: 772

How to read stream of structured data and write to Hive table

There is a need to read the stream of structured data from Kafka stream and write it to the already existing Hive table. Upon analysis, it appears that one of the options is to do readStream of Kafka source and then do writeStream to a File sink in HDFS file path.

My question here is- is it possible to directly write to a Hive table? Or, Is there a workaround approach that can be followed for this use-case?

EDIT1:

.foreachBatch - seems to be working but it is having the issue mentioned below

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SaveMode
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
//subscribe to kafka topic
val csvDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "xxxxxx912:9092").option("subscribe", "testtest").load()
val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]
val query = abcd.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {batchDs.write.mode(SaveMode.Append).insertInto("default.6columns");}).option("quote", "\u0000").start()

hive> select * from 6columns;
OK
0       A3,L1,G1,P1,O1,101,TXN1     testtest        122     0       2019-05-23 12:38:49.515
0       A3,L1,G1,P1,O1,102,TXN2     testtest        123     0       2019-05-23 12:38:49.524
0       A1,L1,G1,P1,O1,100,TXN3     testtest        124     0       2019-05-23 12:38:49.524
0       A2,L2,G1,P1,O2,100,TXN4     testtest        125     0       2019-05-23 12:38:49.524
0       A3,L1,G1,P1,O1,103,TXN5     testtest        126     0       2019-05-23 12:38:54.525
0       A3,L1,G1,P1,O1,104,TXN6     testtest        127     0       2019-05-23 12:38:55.525
0       A4,L1,G1,P1,O1,100,TXN7     testtest        128     0       2019-05-23 12:38:56.526
0       A1,L1,G1,P1,O1,500,TXNID8   testtest        129     0       2019-05-23 12:38:57.526
0       A6,L2,G2,P1,O1,500,TXNID9   testtest        130     0       2019-05-23 12:38:57.526

What I am looking for is to split the value attribute of Kafka message so that the data resembles Hive table it kind of becomes a 12 column table (A3,L1,G1,P1,O1,101,TXN1 - splits into 7 attributes). Need some additional trasformation similar to .option("quote", "\u0000") that I have done while writing the dataframe. But doesn't seem to be working.

Upvotes: 1

Views: 2517

Answers (1)

randal25
randal25

Reputation: 1330

Once you have your stream set up and consuming from kafka you can use the forEachBatch function like so.

val yourStream = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .load()

val query = yourStream.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {
  batchDs
      .write
      .mode(SaveMode.Append)
      .insertInto("your_db.your_table");
}).start()

query.awaitTermination()

to get the String split by , into separate columns you can use the split function to get all the items delimited by a , into an array and then you can select the items individually by index e.g "SPLIT(CAST(value AS STRING), ',')[0]" to get the first element.

So replace

val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]

with

val abcd = csvDF.selectExpr("CAST(key AS STRING)", "SPLIT(CAST(value AS STRING), ',')[0]", "SPLIT(CAST(value AS STRING), ',')[1]", "SPLIT(CAST(value AS STRING), ',')[2]", "SPLIT(CAST(value AS STRING), ',')[3]", "SPLIT(CAST(value AS STRING), ',')[4]", "SPLIT(CAST(value AS STRING), ',')[5]", "SPLIT(CAST(value AS STRING), ',')[6]", "CAST(topic AS STRING)", "CAST(offset AS STRING)", "CAST(partition AS STRING)", "CAST(timestamp AS STRING)").as[(String, String, String, String, String, String, String, String, String, String, String)]

Upvotes: 2

Related Questions