Reputation: 3858
In its simplest form, RDD is merely a placeholder of chained computations that can be arbitrarily scheduled to be executed on any machine:
val src = sc.parallelize(0 to 1000)
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yielding:
1230123012301230
0321032103210321
2130213021302130
*/
This behaviour can obviously be overridden by making any of the upstream RDD persisted, such that Spark scheduler will minimise redundant computation:
val src = sc.parallelize(0 to 1000)
src.persist()
val rdd = src.mapPartitions { itr =>
Iterator(SparkEnv.get.executorId)
}
for (i <- 1 to 3) {
val vs = rdd.collect()
println(vs.mkString)
}
/* yield:
2013201320132013
2013201320132013
2013201320132013
each partition has a fixed executorID
*/
Now my problem is :
I don't like the vanilla caching mechanism (see this post: In Apache Spark, can I incrementally cache an RDD partition?) and have wrote my own caching mechanism (by implementing a new RDD). Since the new caching mechanism is only capable of reading existing values from local disk/memory, if there are multiple executors, my cache for each partition will be frequently missed every time the partition is executed in a task on another machine.
So my question is :
How do I mimic Spark RDD persistent implementation to ask the DAG scheduler to enforce/suggest locality aware task scheduling? Without actually calling the .persist()
method, because it is unnecessary.
Upvotes: 2
Views: 68