Rod
Rod

Reputation: 366

FileNotFoundException: Spark save fails. Cannot clear cache from Dataset[T] avro

I get the following error when saving a dataframe in avro for a second time. If I delete sub_folder/part-00000-XXX-c000.avro after saving, and then try to save the same dataset, I get the following:

FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

The example suggests that the tables need to be refreshed, but as the output of sparkSession.catalog.listTables().show() there are no tables to refresh.

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

The previously saved dataframe looks like this. The application is supposed to update it:

+--------------------+--------------------+
|              Col1  |               Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+

For me this is a clear cache problem. However, all attemps of clearing the cache have failed:

 dataset.write
      .format("avro")
      .option("path", path)
      .mode(SaveMode.Overwrite) // Any save mode gives the same error
      .save()

// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()  

// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist() 

And this is how I read the dataset:

private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {

    val df = sparkSession.read
      .format("avro")
      .load(path)
      .select("*")

    df.as[T]
  }

Finally the stack trace is this one. Thanks a lot for your help!:

ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 10 more

Upvotes: 3

Views: 4064

Answers (2)

Rod
Rod

Reputation: 366

Thanks a lot Ram Ghadiyaram!

The solution had 2 solved my problem but only in my local Ubuntu. When I tested in HDFS, the problem remained.

The solution 1 was the definite fix. This is how my code looks now:

private def doWriteToPath[T <: Product with Serializable: TypeTag: ClassTag](dataset: Dataset[T], path: String): Unit = {

// clear any previously cached avro
sparkSession.catalog.clearCache()

// update the cache for this particular dataset, and trigger an action
dataset.cache().show(1)

dataset.write
  .format("avro")
  .option("path", path)
  .mode(SaveMode.Overwrite)
  .save()    
}

Some remarks: I had indeed checked that post, and attempted unsuccessfully the solution. I discarded that to be my problem, for the following reasons:

  • I had created a /temp under 'main_folder', called 'sub_folder_temp', and saving still failed.
  • Saving the same non-empty dataset in the same path but in json format actually works without the workaround discussed here.
  • Saving an empty dataset with the same type [T] in the same path actually works without the workaround discussed here.

Upvotes: 1

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

*Reading from the same location and writing in to same location will give this issue. it was also discussed in this forum. along with my answer there *

and the below message in the error will mis lead. but actual issue is read/write from/in the same location.

You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL

I am giving another example other than yours (used parquet in your case avro).

I have 2 options for you.

Option 1 (cache and show will work like below...) :

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("Rod saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

Option 2 :

1) save the DataFrame with a different avro folder.

2) Delete the old avro folder.

3) Finally rename this newly created avro folder to the old name, will work.

Upvotes: 5

Related Questions