Reputation: 23
I have a spark 1.3 application that performs typical ETL work: it reads from several different hive tables, performs join and other operations on the dataframes and finally save the output as text file to HDFS location.
This application works fine, except its stage 6 often encounter failures with error messages like "failed without being ACK'd". Spark will retry and eventually complete all the stages successfully.
In order to speed up the retry process, I would like to cache the parent dataframes of the stage 6. I added .persist(StorageLevel.MEMORY_AND_DISK_SER)
for dataframes that were used in stage 6. However, when the job was running, from the spark UI, I can see nothing was cached/persisted. When stage 6 failed all the needed data were recalculated again.
This is the same for two clusters, one standalone, one under Yarn. I also tried .cache()
, or .persist(StorageLevel.MEMORY_ONLY)
etc but same results. I'm not sure what is the cause of this. The data I wanted to cache is not big comparing to the memory available (~50G vs ~500G). The only other thing worth mentioning is the data I wanted to cache was never used more than once later - in theory, if stage 6 does not fail routinely, I don't need to cache at all.
Anyone has more insights on why persist/cache behaves like this?
Upvotes: 2
Views: 11005
Reputation: 1316
You may have cached RDDs at the stage boundary, so that the actual caching and the error happen in the same stage. Try forcing to materialize the RDDs by running an action on each of them, e.g. a .count()
right after you cached them.
Also note that when an executor dies, the data it caches goes with it. If your error falls into this category, you will have better luck with .checkpoint()
instead of caching.
Lastly, I recommend spending the time to understand what actually causes the error you are seeing (sounds like a timeout is too low) as otherwise you will keep wasting cycles on recomputing data and running into similar conditions eventually again.
Upvotes: 5