Sharath
Sharath

Reputation: 41

Overwriting the parquet file throws exception in spark

I am trying to read the parquet file from hdfs location, do some transformations and overwrite the file in the same location. I had to overwrite the file in the same location because I had to run the same code multiple times.

Here is the code I have written

val df = spark.read.option("header", "true").option("inferSchema", "true").parquet("hdfs://master:8020/persist/local/")
//after applying some transformations lets say the final dataframe is transDF which I want to overwrite at the same location.
transDF.write.mode("overwrite").parquet("hdfs://master:8020/persist/local/")

Now the problem is before reading the parquet file from the given location, spark for some reason I believe it deletes the file at the given location because of overwrite mode. So when executing the code I get the following error.

File does not exist: hdfs://master:8020/persist/local/part-00000-e73c4dfd-d008-4007-8274-d445bdea3fc8-c000.snappy.parquet

Any suggestions on how to solve this problem? Thanks.

Upvotes: 1

Views: 4524

Answers (2)

BigData-Guru
BigData-Guru

Reputation: 1261

This is what I have tried and it worked. My requirement was almost same. It was upsert option.

by the way, spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') property was set. Even then also the Transform job was failing

  1. Took a backup of S3 folder (final curated layer) before every batch operation
  2. using the dataframe operations, first delete the S3 parquet file location before overwrite
  3. then Append to the particular location

Previously the entire job was running for 1.5Hrs and failing frequently. Now it's taking 10-15mins for the entire operations

Upvotes: -1

Assaf Mendelson
Assaf Mendelson

Reputation: 12991

The simple answer is that you cannot overwrite what you are reading. The reason behind this is that overwrite would need to delete everything, however, since spark is working in parallel, some portions might still be reading at the time. Furthermore, even if everything was read, spark needs the original file to recalculate tasks which are failed.

Since you need the input for multiple iterations, I would simply make the name of the input and the output into arguments for the function that does one iteration and delete the previous iteration only once the writing is successful.

Upvotes: 6

Related Questions