Reputation: 379
val logList: RDD[String] = ...
val errorLogs = logList.filter(_.contains("Error")).persist()
//first action
val first100 = errorLogs.take(100)
//second action
val count = errorLogs.count
How will persist work this case ? In the case of below code
val errorLogs = logList.filter(_.contains("Error")).take(100)
Spark will not scan through all the logs sinceSpark knows that we are interested only in 100 lines of logs. But what will happen when we cache this RDD and invoke multiple actions on it, first action requiring only few records and later ones needing the whole RDD to be transformed.
Will it cache the records when the first action is invoked ? Or will it cache only partial records which were needed for the first action when first action is invoked ?
Upvotes: 3
Views: 776
Reputation: 1007
How cache
works:
cache
and persist
are lazy - if no action is called on the RDD
marked for caching data is not persisted. Similarly if partition is not evaluated data is not persisted.cache
persists whole partitions. It cannot persist single record or a fraction of partition. If data is marked for caching, and partition is at least partially evaluated, Spark will evaluate it fully and attempt to persist. How limit
works:
limit
evaluates the first partition. If all records required by limit can be collected from the first partition the job finishes.spark.rdd.limit.scaleUpFactor
. If all records required by limit can be collected from these partitions the job finishes. Otherwise this step repeats. This means that:
val first100 = errorLogs.take(100)
will cache at least the first partition. If the first partition doesn't contain 100 records it will evaluate and cache subsequent partitions until it gets to 100 records or evaluates a full dataset.val count = errorLogs.count
will evaluate all partitions and cache remaining partitions if possible.val errorLogs = logList.filter(_.contains("Error")).take(100)
will work almost exactly the same as errorLogs.take(100)
. The only impact of filter
is that limit
might have to evaluate more data. If all lines contain Error
the result is the same as for the first step.
If this runs after the first two steps, and data is fully cached and haven't been evicted, it will use data from cache.
Upvotes: 2
Reputation: 61
In this case Spark will cache only the minimum number of partitions required to collect 100 records (due to take implementation, the actual number can be higher).
Only the second is guaranteed to cache all records.
Upvotes: 6