B. Smith
B. Smith

Reputation: 1193

When can I unpersist my RDD?

I have a method that gets called several times. This method looks like the following:

def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = {
    val newRDD = myRDD.map(......)  //do stuff
    newRDD.cache  //newRDD has 2 actions performed on it

    val badRDD = newRDD.filter(row => row.contains("bad"))
    badRDD.count

    val goodRDD = newRDD.filter(row => row.contains("good"))
    goodRDD.count

    newRDD.unpersist // I am unpersisting because this method gets called several times

    goodRDD
}

Like I said, I want to unpersist newRDD because the method gets called several times and I don't want 4 copies of different cached newRDDs. Here is a code sample:

val firstRDD = separateGoodAndBad(originalRDD)
val firstRDDTransformed = doStuffToFirstRDD(firstRDD)

val secondRDD = separateGoodAndBad(firstRDDTransformed)
val secondRDDTransformed = doStuffToSecondRDD(secondRDD)

val thirdRDD = separateGoodAndBad(secondRDDTransformed)
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD)

However, secondRDD and thirdRDD are taking MUCH longer now that I added the unpersist (see above in separateGoodAndBad(). It seems that they are having to recompute newRDD.

When can I unpersist newRDD so that it never has to get recomputed?

Upvotes: 0

Views: 1236

Answers (1)

vatsal mevada
vatsal mevada

Reputation: 5616

You may want to cache goodRDD also as it is computed once when you do goodRDD.count and it will be recomputed again when you perform some action on that RDD inside doStuffToFirstRDD method.

    def separateGoodAndBad(myRDD: RDD[String]): RDD[String] = {
        val newRDD = myRDD.map(......)  //do stuff
        newRDD.cache  //newRDD has 2 actions performed on it

        val badRDD = newRDD.filter(row => row.contains("bad"))
        badRDD.count

        val goodRDD = newRDD.filter(row => row.contains("good"))
        goodRDD.cache    // this will cache goodRDD to avoid recomputing in next call
        goodRDD.count

        newRDD.unpersist // I am unpersisting because this method gets called several times

        goodRDD
    }

Then you can unpersist them outside the function call something like this:

val firstRDD = separateGoodAndBad(originalRDD)
val firstRDDTransformed = doStuffToFirstRDD(firstRDD)

val secondRDD = separateGoodAndBad(firstRDDTransformed)
firstRDD .unpersist  //as your secondRDD will be cached by above `separateGoodAndBad` call
val secondRDDTransformed = doStuffToSecondRDD(secondRDD)

val thirdRDD = separateGoodAndBad(secondRDDTransformed)
secondRDD.unpersist  //as your thirdRDD will be cached by above `separateGoodAndBad` call
val thirdRDDTransformed = doStuffToThirdRDD(thirdRDD)

Upvotes: 2

Related Questions