Reputation: 888
One query on spark structured streaming integration with HIVE table.
I have tried to do some examples of spark structured streaming.
here is my example
val spark =SparkSession.builder().appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
.getOrCreate()
// Register the dataframe as a Hive table
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta")
csvDF.createOrReplaceTempView("updates")
val query= spark.sql("insert into table_abcd select * from updates")
query.writeStream.start()
As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").
I am getting
spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()
Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?
Looking for a solution
Upvotes: 13
Views: 11165
Reputation: 18475
On HDP 3.1 with Spark 2.3.2 and Hive 3.1.0 we have used Hortonwork's spark-llap library to write structured streaming DataFrame from Spark to Hive. On GitHub you will find some documentation on its usage.
The required library hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar is available on Maven and needs to be passed on in the spark-submit
command. There are many more recent versions of that library, although I haven't had the chance to test them.
After creating the Hive table manually (e.g. through beeline/Hive shell) you could apply the following code:
import com.hortonworks.hwc.HiveWarehouseSession
val csvDF = spark.readStream.[...].load()
val query = csvDF.writeStream
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start()
query.awaitTermination()
Upvotes: 2
Reputation: 74619
Spark Structured Streaming does not support writing the result of a streaming query to a Hive table.
scala> println(spark.version)
2.4.0
val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame
scala> assert(sq.isStreaming)
scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
... 49 elided
If a target system (aka sink) is not supported you could use use foreach and foreachBatch operations (highlighting mine):
The
foreach
andforeachBatch
operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - whileforeach
allows custom write logic on every row,foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
I think foreachBatch
is your best bet.
import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
// do whatever you want with your input DataFrame
// incl. writing to Hive
// I simply decided to print out the rows to the console
ds.show
}.start
There is also Apache Hive Warehouse Connector that I've never worked with but seems like it may be of some help.
Upvotes: 9
Reputation: 2345
Just in case someone actually tried the code from Jacek Laskowski he knows that it does not really compile in Spark 2.4.0 (check my gist tested on AWS EMR 5.20.0 and vanilla Spark). So I guess that was his idea of how it should work in some future Spark version. The real code is:
scala> import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
scala> sq.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => batchDs.show).start
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5ebc0bf5
Upvotes: 1