Reputation: 1193
I want to log the number of rows in an RDD halfway between the start and end transformations. My code currently looks like this:
val transformation1 = firstTransformation(inputdata).cache // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).cache
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
My problem is that transformation1
is a huge RDD and takes up a lot of memory (it fits in memory but causes memory problems later on). However, I know that since I am performing 2 different operations on tranformation1 (.count()
and secondTransformation()
) it is normally recommended that it should be cached.
This type of scenario is probably very common, so what is the recommended way of dealing with it? Should you always cache an RDD before an intermediate count, or can I remove the .cache()
on transformation1?
Upvotes: 0
Views: 559
Reputation: 3863
If you are having memory problems you should unpersist as soon as possible and you could also persist on disk.
val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY) // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY)
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
// All the actions are done
transformation1.unpersist()
transformation2.unpersist()
if you can use unpersist before the memory issues happened it would be better if you cache instead of persist on disk
Upvotes: 1