w4bo
w4bo

Reputation: 905

Iterative caching vs checkpointing in Spark

I have an iterative application running on Spark that I simplified to the following code:

var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue 
var iteration: Int = 0
while (c > 0) {
    iteration += 1
    // Manipulate the RDD and cache the new RDD
    anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
    // Actually compute the RDD and spawn a new job
    c = anRDD.count()
    println(s"Iteration: $iteration, Values: $c")
}

What happens to the memory allocation within consequent jobs?

Upvotes: 3

Views: 1882

Answers (2)

simpadjo
simpadjo

Reputation: 4017

Unfortunately seems that Spark is not good for things like that.

Your original implementation is not viable because on each iteration the newer RDD will have an internal reference to the older one so all RDDs pile up in memory.

localCheckpoint is an approximation of what you are trying to achieve. It does truncate RDD's lineage but you lose fault tolerance. It's clearly stated in the documentation for this method.

checkpoint is also an option. It is safe but it would dump the data to hdfs on each iteration.

Consider redesigning the approach. Such hacks could bite sooner or later.

Upvotes: 3

wypul
wypul

Reputation: 837

  1. RDDs are immutable so each transformation will return a new RDD. All anRDD will be kept in memory. See below(running two iteration for your code), id will be different for all the RDDs enter image description here

    So yes, In the long run, this can throw some memory exception. And you should unpersist rdd after you are done processing on it.

  2. localCheckpoint has different use case than cache. It is used to truncate the lineage of RDD. It doesn't store RDD to disk/local It improves performance but decreases fault tolerance in turn.

Upvotes: 2

Related Questions