Reputation: 3470
I have been interested in finding out why I am getting strange behavior when running a certain spark job. The job will error out if I place an action (A .show(1)
method) either right after caching the DataFrame or right before writing the dataframe back to hdfs.
There is a very similar post to SO here:
Basically the other post explains, that when you read from the same HDFS directory that you are writing to, and your SaveMode
is "overwrite"
, then you will get a java.io.FileNotFoundException
.
But here I am finding that just moving where in the program the action is can give very different results - either completing the program or giving this exception.
I was wondering if anyone can explain why Spark is not being consistent here?
val myDF = spark.read.format("csv")
.option("header", "false")
.option("delimiter", "\t")
.schema(schema)
.load(myPath)
// If I cache it here or persist it then do an action after the cache, it will occasionally
// not throw the error. This is when completely restarting the SparkSession so there is no
// risk of another user interfering on the same JVM.
myDF.cache()
myDF.show(1)
// Just an example.
// Many different transformations are then applied...
val secondDF = mergeOtherDFsWithmyDF(myDF, otherDF, thirdDF)
val fourthDF = mergeTwoDFs(thirdDF, StringToCheck, fifthDF)
// Below is the same .show(1) action call as was previously done, only this below
// action ALWAYS results in a successful completion and the above .show(1) sometimes results
// in FileNotFoundException and sometimes results in successful completion. The only
// thing that changes among test runs is only one is executed. Either
// fourthDF.show(1) or myDF.show(1) is left commented out
fourthDF.show(1)
fourthDF.write
.mode(writeMode)
.option("header", "false")
.option("delimiter", "\t")
.csv(myPath)
Upvotes: 1
Views: 1867
Reputation: 1944
Spark only materializes rdds on demand and Most actions require to read all partitions of the DF such us count() but actions such as take() and first() do not require all the partitions.
In your case, it requires a single partition so only 1 partition is materialized and cached. Then when you do a count() all partitions need to be materialized and cached to the extent your available memory allows.
Upvotes: 1
Reputation: 28412
Try using count
instead of show(1)
, I believe the issue is due to Spark trying to be being smart and not loading the whole dataframe (since show
does not need everything). Running count
forces Spark to load and properly cache all the data which will hopefully make the inconsistency go away.
Upvotes: 3