BigD
BigD

Reputation: 888

How to insert spark structured streaming DataFrame to Hive external table/location?

One query on spark structured streaming integration with HIVE table.

I have tried to do some examples of spark structured streaming.

here is my example

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").

I am getting

spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()

Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?

Looking for a solution

Upvotes: 13

Views: 11165

Answers (3)

Michael Heil
Michael Heil

Reputation: 18475

On HDP 3.1 with Spark 2.3.2 and Hive 3.1.0 we have used Hortonwork's spark-llap library to write structured streaming DataFrame from Spark to Hive. On GitHub you will find some documentation on its usage.

The required library hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar is available on Maven and needs to be passed on in the spark-submit command. There are many more recent versions of that library, although I haven't had the chance to test them.

After creating the Hive table manually (e.g. through beeline/Hive shell) you could apply the following code:

import com.hortonworks.hwc.HiveWarehouseSession

val csvDF = spark.readStream.[...].load()

val query = csvDF.writeStream
  .format(HiveWarehouseSession.STREAM_TO_STREAM)
  .option("database", "database_name")
  .option("table", "table_name")
  .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
  .option("checkpointLocation", "/path/to/checkpoint/dir")
  .start()

query.awaitTermination()

Upvotes: 2

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Spark Structured Streaming does not support writing the result of a streaming query to a Hive table.

scala> println(spark.version)
2.4.0

val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame

scala> assert(sq.isStreaming)

scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided

If a target system (aka sink) is not supported you could use use foreach and foreachBatch operations (highlighting mine):

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

I think foreachBatch is your best bet.

import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start

There is also Apache Hive Warehouse Connector that I've never worked with but seems like it may be of some help.

Upvotes: 9

Viacheslav Rodionov
Viacheslav Rodionov

Reputation: 2345

Just in case someone actually tried the code from Jacek Laskowski he knows that it does not really compile in Spark 2.4.0 (check my gist tested on AWS EMR 5.20.0 and vanilla Spark). So I guess that was his idea of how it should work in some future Spark version. The real code is:

scala> import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset

scala> sq.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => batchDs.show).start
res0: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5ebc0bf5

Upvotes: 1

Related Questions