Reputation: 623
I have next code. I am doing count to perform persist operation and fix transformations above. But I noticed that DAG and stages for 2 different count Jobs calls first persist twice (when I expect second persist method to be called in second count call)
val df = sparkSession.read
.parquet(bigData)
.filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
.as[SafegraphRawData]
// So repartition here to be able perform shuffle operations later
// another transformations and minor filtration
.repartition(nrInputPartitions)
// Firstly persist here since objects not fit in memory (Persist 67)
.persist(StorageLevel.MEMORY_AND_DISK)
LOG.info(s"First count = " + df.count)
val filter: BaseFilter = new BaseFilter()
LOG.info(s"Number of partitions: " + df.rdd.getNumPartitions)
val rddPoints= df
.map(parse)
.filter(filter.IsValid(_, deviceStageMetricService, providerdevicelist, sparkSession))
.map(convert)
// Since we will perform count and partitionBy actions, compute all above transformations/ Second persist
val dsPoints = rddPoints.persist(StorageLevel.MEMORY_AND_DISK)
val totalPoints = dsPoints.count()
LOG.info(s"Second count = $totalPoints")
Upvotes: 1
Views: 2050
Reputation: 9
try
val df = sparkSession.read
.parquet(bigData)
.filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
.as[SafegraphRawData]
// So repartition here to be able perform shuffle operations later
// another transformations and minor filtration
.repartition(nrInputPartitions)
// Firstly persist here since objects not fit in memory (Persist 67)
df.persist(StorageLevel.MEMORY_AND_DISK)
Upvotes: -1
Reputation: 6994
When you say StorageLevel.MEMORY_AND_DISK spark tries to fit all the data into the memory and if it doesn't fit it spills to disk.
Now you are doing multiple persists here. In spark the memory cache is LRU so the later persists will overwrite the previous cached data.
Even if you specify StorageLevel.MEMORY_AND_DISK
when the data is evicted from cache memory by another cached data spark doesn't spill that to the disk. So when you do the next count it needs to revaluate the DAG so that it can retrieve the partitions which aren't present in the cache.
I would suggest you to use StorageLevel.DISK_ONLY to avoid such re-computation.
Upvotes: 6
Reputation: 2082
Here's is the whole scenario.
persist and cache are also the transformation in Spark. After applying any one of the stated transformation, one should use any action in order to cache an RDD or DF to the memory.
Secondly, The unit of cache or persist is "partition". When cache or persist gets executed it will save only those partitions which can be hold in the memory. The remaining partition which cannot be saved on the memory- whole DAG will be executed again once any new action will be encountered.
Upvotes: 0