monster
monster

Reputation: 1782

Apache Spark RDD cache and Lineage confusion

I fill an RDD with some random math values:

val itemFactors = rddItems.mapValues(newFactors => 
    Vector(Array.fill(2){math.random})
)

I then join that RDD to some other RDD and cache it:

val finalRDD = itemFactors.join(rddItemsUsers).map{
    case(itemid, (itemVector, ((userid, rating), userVector))) => 
        (itemid, itemVector, userid, userVector, rating)}.cache

I then perform a calculation on the data held in the finalRDD:

sqrt(finalRDD.aggregate(0.0)((accum, item) => 
    accum + pow(item._5 - item._4.dot(item._2), 2), _ + _) / finalRDD.count)

I call the final part of the code, sqrt(...) repeatedly from the console and every single time I get a different result - which is not desired as I haven't changed anything! This can be remedied (i.e. made so I get a consistent result) in 2 ways:

Now, I understand that due to lineage, every time itemFactors is called it will call math.random and create a new number - this will therefore affect my calculation when it's performed. This is why using a fixed number when filling the Array produces consistent result.

But, the big problem and the bit which I don't understand is: I am caching finalRDD which is what the calculation is performed on, and as it comprises of itemFactors, surely it shouldn't matter what itemFactor's Array is filled with as the node is only visited once? I thought I was beginning to get a grasp on the lineage; however, this has just thrown me.

Upvotes: 3

Views: 2032

Answers (1)

Karthik
Karthik

Reputation: 1811

If your cache is not able to fit in memory, it is lost based on LRU policy.

To avoid that you can use persist which take arguments as shown

val result = input.map(x => x*x)
result.persist(MEMORY_ONLY)

MEMORY_ONLY - Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

DISK_ONLY Store the RDD partitions only on disk. MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.

OFF_HEAP (experimental) Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.

Refer to this link for more documentation

Upvotes: 3

Related Questions