Avi
Avi

Reputation: 491

Read from a hive table and write back to it using spark sql

I am reading a Hive table using Spark SQL and assigning it to a scala val

val x = sqlContext.sql("select * from some_table")

Then I am doing some processing with the dataframe x and finally coming up with a dataframe y , which has the exact schema as the table some_table.

Finally I am trying to insert overwrite the y dataframe to the same hive table some_table

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")

Then I am getting the error

org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from

I tried creating an insert sql statement and firing it using sqlContext.sql() but it too gave me the same error.

Is there any way I can bypass this error? I need to insert the records back to the same table.


Hi I tried doing as suggested , but still getting the same error .

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;

Upvotes: 23

Views: 48306

Answers (8)

Mardaunt
Mardaunt

Reputation: 92

This is good solution for me:

  1. Extract RDD and schema from DataFrame.

  2. Create new clone DataFame.

  3. Overwrite table.

     private def overWrite(df: DataFrame): Unit = {
       val schema = df.schema
       val rdd    = df.rdd
       val dfForSave = spark.createDataFrame(rdd, schema)
       dfForSave.write
                .mode(SaveMode.Overwrite)
                .insertInto(s"${tableSource.schema}.${tableSource.table}")}
    

Upvotes: 0

Rushabh Gujarathi
Rushabh Gujarathi

Reputation: 146

What you need to keep in mind before doing below is that the hive table in which you are overwriting should be have been created by hive DDL not by

spark(df.write.saveAsTable("<table_name>"))

if the above is not true this wont work. I tested this in spark 2.3.0

val tableReadDf=spark.sql("select * from <dbName>.<tableName>")
val updatedDf=tableReadDf.<transformation> //any update/delete/addition 
updatedDf.createOrReplaceTempView("myUpdatedTable")
spark.sql("""with tempView as(select * from myUpdatedTable) insert overwrite table 
<dbName>.<tableName> <partition><partition_columns> select * from tempView""")

Upvotes: 0

Ramesh Gajula
Ramesh Gajula

Reputation: 1

You'll also get the Error: "Cannot overwrite a path that is also being read from" in a case where your are doing this:

  1. You are "insert overwrite" to a hive TABLE "A" from a VIEW "V" (that executes your logic)
  2. And that VIEW also references the same TABLE "A". I found this the hard way as the VIEW is deeply nested code that was querying "A" as well. Bummer.

It is like cutting the very branch on which you are sitting :-(

Upvotes: 0

dinesh028
dinesh028

Reputation: 2187

In context to Spark 2.2

  1. This error means that our process is reading from same table and writing to same table.
  2. Normally, this should work as process writes to directory .hiveStaging...
  3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions.
  4. This error should not occur with insertInto method, as it overwrites partitions not the table.
  5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto method if you remove following Spark TBLProperties -

'spark.sql.partitionProvider' 'spark.sql.sources.provider' 'spark.sql.sources.schema.numPartCols 'spark.sql.sources.schema.numParts' 'spark.sql.sources.schema.part.0' 'spark.sql.sources.schema.part.1' 'spark.sql.sources.schema.part.2' 'spark.sql.sources.schema.partCol.0' 'spark.sql.sources.schema.partCol.1'

https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html

when we upgraded our HDP to 2.6.3 The Spark was updated from 2.2 to 2.3 which resulted in below error -

Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906)

This error occurs for job where-in we are reading and writing to same path. Like Jobs with SCD Logic

Solution -

  1. Set --conf "spark.sql.hive.convertMetastoreOrc=false"
  2. or, update the job such that it writes data to a temporary table. Then reads from temporary table and insert it into final table.

https://querydb.blogspot.com/2020/09/orgapachesparksqlanalysisexception.html

Upvotes: 2

Sai Kranthi
Sai Kranthi

Reputation: 52

Read the data from hive table in spark:

val hconfig = new org.apache.hadoop.conf.Configuration()
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hconfig , "dbname", "tablename")

val inputFormat = (new HCatInputFormat).asInstanceOf[InputFormat[WritableComparable[_],HCatRecord]].getClass

val data = sc.newAPIHadoopRDD(hconfig,inputFormat,classOf[WritableComparable[_]],classOf[HCatRecord])

Upvotes: 0

matteus silva
matteus silva

Reputation: 21

You should first save your DataFrame y like a parquet file:

y.write.parquet("temp_table")

After you load this like:

val parquetFile = sqlContext.read.parquet("temp_table")

And finish you insert your data in your table

parquetFile.write.insertInto("some_table")

Upvotes: 1

nsanglar
nsanglar

Reputation: 1742

Actually you can also use checkpointing to achieve this. Since it breaks data lineage, Spark is not able to detect that you are reading and overwriting in the same table:

 sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")

Upvotes: 13

cheseaux
cheseaux

Reputation: 5325

You should first save your DataFrame y in a temporary table

y.write.mode("overwrite").saveAsTable("temp_table")

Then you can overwrite rows in your target table

val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("some_table")

Upvotes: 12

Related Questions