jk1
jk1

Reputation: 623

Why SPARK repeat transformations after persist operations?

I have next code. I am doing count to perform persist operation and fix transformations above. But I noticed that DAG and stages for 2 different count Jobs calls first persist twice (when I expect second persist method to be called in second count call)

val df = sparkSession.read
      .parquet(bigData)
      .filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
      .as[SafegraphRawData]
      // So repartition here to be able perform shuffle operations later
      // another transformations and minor filtration
      .repartition(nrInputPartitions)
      // Firstly persist here since objects not fit in memory (Persist 67)
      .persist(StorageLevel.MEMORY_AND_DISK)

    LOG.info(s"First count  = " + df.count)

    val filter: BaseFilter = new BaseFilter()

    LOG.info(s"Number of partitions: " + df.rdd.getNumPartitions)
    val rddPoints= df
      .map(parse)
      .filter(filter.IsValid(_, deviceStageMetricService, providerdevicelist, sparkSession))
      .map(convert)
    // Since we will perform count and partitionBy actions, compute all above transformations/ Second persist 
    val dsPoints = rddPoints.persist(StorageLevel.MEMORY_AND_DISK)
    val totalPoints = dsPoints.count()
    LOG.info(s"Second count  = $totalPoints")

First count LOG.info(s"First count  = " + df.count)

LOG.info(s"Second count  = $totalPoints")

Upvotes: 1

Views: 2050

Answers (3)

krk
krk

Reputation: 9

try

val df = sparkSession.read
      .parquet(bigData)
      .filter(row => dateRange(row.getLong(5), lowerTimeBound, upperTimeBound))
      .as[SafegraphRawData]
      // So repartition here to be able perform shuffle operations later
      // another transformations and minor filtration
      .repartition(nrInputPartitions)
      // Firstly persist here since objects not fit in memory (Persist 67)

df.persist(StorageLevel.MEMORY_AND_DISK)

Upvotes: -1

Avishek Bhattacharya
Avishek Bhattacharya

Reputation: 6994

When you say StorageLevel.MEMORY_AND_DISK spark tries to fit all the data into the memory and if it doesn't fit it spills to disk.

Now you are doing multiple persists here. In spark the memory cache is LRU so the later persists will overwrite the previous cached data.

Even if you specify StorageLevel.MEMORY_AND_DISK when the data is evicted from cache memory by another cached data spark doesn't spill that to the disk. So when you do the next count it needs to revaluate the DAG so that it can retrieve the partitions which aren't present in the cache.

I would suggest you to use StorageLevel.DISK_ONLY to avoid such re-computation.

Upvotes: 6

Ishan Kumar
Ishan Kumar

Reputation: 2082

Here's is the whole scenario.

persist and cache are also the transformation in Spark. After applying any one of the stated transformation, one should use any action in order to cache an RDD or DF to the memory.

Secondly, The unit of cache or persist is "partition". When cache or persist gets executed it will save only those partitions which can be hold in the memory. The remaining partition which cannot be saved on the memory- whole DAG will be executed again once any new action will be encountered.

Upvotes: 0

Related Questions