Albin
Albin

Reputation: 379

How Caching an RDD works in case of multiple actions on a persisted RDD

val logList: RDD[String] = ...
val errorLogs = logList.filter(_.contains("Error")).persist()
//first action   
val first100 = errorLogs.take(100)
//second action
val count = errorLogs.count 

How will persist work this case ? In the case of below code

val errorLogs = logList.filter(_.contains("Error")).take(100)

Spark will not scan through all the logs sinceSpark knows that we are interested only in 100 lines of logs. But what will happen when we cache this RDD and invoke multiple actions on it, first action requiring only few records and later ones needing the whole RDD to be transformed.

Will it cache the records when the first action is invoked ? Or will it cache only partial records which were needed for the first action when first action is invoked ?

Upvotes: 3

Views: 776

Answers (2)

Aaron Makubuya
Aaron Makubuya

Reputation: 1007

How cache works:

  • cache and persist are lazy - if no action is called on the RDD marked for caching data is not persisted. Similarly if partition is not evaluated data is not persisted.
  • cache persists whole partitions. It cannot persist single record or a fraction of partition. If data is marked for caching, and partition is at least partially evaluated, Spark will evaluate it fully and attempt to persist.

How limit works:

  • First limit evaluates the first partition. If all records required by limit can be collected from the first partition the job finishes.
  • If not Spark increases number of partitions to be evaluated by spark.rdd.limit.scaleUpFactor. If all records required by limit can be collected from these partitions the job finishes. Otherwise this step repeats.

This means that:

  • val first100 = errorLogs.take(100) will cache at least the first partition. If the first partition doesn't contain 100 records it will evaluate and cache subsequent partitions until it gets to 100 records or evaluates a full dataset.
  • val count = errorLogs.count will evaluate all partitions and cache remaining partitions if possible.
  • val errorLogs = logList.filter(_.contains("Error")).take(100) will work almost exactly the same as errorLogs.take(100). The only impact of filter is that limit might have to evaluate more data. If all lines contain Error the result is the same as for the first step.

    If this runs after the first two steps, and data is fully cached and haven't been evicted, it will use data from cache.

Upvotes: 2

user10162213
user10162213

Reputation: 61

In this case Spark will cache only the minimum number of partitions required to collect 100 records (due to take implementation, the actual number can be higher).

Only the second is guaranteed to cache all records.

Upvotes: 6

Related Questions