Reputation: 1193
I have a method that gets called several times. This method looks like the following:
def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = {
val newRDD = myRDD.map(......) //do stuff
newRDD.cache //newRDD has 2 actions performed on it
val badRDD = newRDD.filter(row => row.contains("bad"))
badRDD.count
val goodRDD = newRDD.filter(row => row.contains("good"))
goodRDD.count
newRDD.unpersist // I am unpersisting because this method gets called several times
goodRDD
}
Like I said, I want to unpersist newRDD
because the method gets called several times and I don't want 4 copies of different cached newRDDs
. Here is a code sample:
val firstRDD = separateGoodAndBad(originalRDD)
val firstRDDTransformed = doStuffToFirstRDD(firstRDD)
val secondRDD = separateGoodAndBad(firstRDDTransformed)
val secondRDDTransformed = doStuffToSecondRDD(secondRDD)
val thirdRDD = separateGoodAndBad(secondRDDTransformed)
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD)
However, secondRDD
and thirdRDD
are taking MUCH longer now that I added the unpersist (see above in separateGoodAndBad()
. It seems that they are having to recompute newRDD
.
When can I unpersist newRDD
so that it never has to get recomputed?
Upvotes: 0
Views: 1236
Reputation: 5616
You may want to cache goodRDD
also as it is computed once when you do goodRDD.count
and it will be recomputed again when you perform some action on that RDD inside doStuffToFirstRDD
method.
def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = {
val newRDD = myRDD.map(......) //do stuff
newRDD.cache //newRDD has 2 actions performed on it
val badRDD = newRDD.filter(row => row.contains("bad"))
badRDD.count
val goodRDD = newRDD.filter(row => row.contains("good"))
goodRDD.cache // this will cache goodRDD to avoid recomputing in next call
goodRDD.count
newRDD.unpersist // I am unpersisting because this method gets called several times
goodRDD
}
Then you can unpersist them outside the function call something like this:
val firstRDD = separateGoodAndBad(originalRDD)
val firstRDDTransformed = doStuffToFirstRDD(firstRDD)
val secondRDD = separateGoodAndBad(firstRDDTransformed)
firstRDD .unpersist //as your secondRDD will be cached by above `separateGoodAndBad` call
val secondRDDTransformed = doStuffToSecondRDD(secondRDD)
val thirdRDD = separateGoodAndBad(secondRDDTransformed)
secondRDD.unpersist //as your thirdRDD will be cached by above `separateGoodAndBad` call
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD)
Upvotes: 2