Reputation: 389
I am trying to find the situations in which Spark would skip stages in case I am using RDDs. I know that it will skip stages if there is a shuffle operation happening. So, I wrote the following code to see if it is true:
def main(args: Array[String]): Unit =
{
val conf = new SparkConf().setMaster("local").setAppName("demo")
val sc = new SparkContext(conf)
val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i))
val c=d.rightOuterJoin(d.reduceByKey(_+_)).collect
val f=d.leftOuterJoin(d.reduceByKey(_+_)).collect
val g=d.join(d.reduceByKey(_ + _)).collect
}
On inspecting Spark UI, I am getting the following jobs with its stages:
I was expecting stage 3 and stage 6 to be skipped as these were using the same RDD to compute the required joins(given the fact that in case of shuffle, spark automatically caches data). Can anyone please explain why am I not seeing any skipped stages here? And how can I modify the code to see the skipped stages? And are there any other situations(apart from shuffling) when Spark is expected to skip stages?
Upvotes: 4
Views: 1754
Reputation: 18023
Actually, it is very simple.
In your case nothing can be skipped as each Action has a different JOIN type. It needs to scan d and d' to compute the result. Even with .cache (which you do not use and should use to avoid recomputing all the way back to source on each Action), this would make no difference.
Looking at this simplified version:
val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter
val c=d.rightOuterJoin(d.reduceByKey(_+_))
val f=d.leftOuterJoin(d.reduceByKey(_+_))
c.count
c.collect // skipped, shuffled
f.count
f.collect // skipped, shuffled
Shows the following Jobs for this App:
(4) Spark Jobs
Job 116 View(Stages: 3/3)
Job 117 View(Stages: 1/1, 2 skipped)
Job 118 View(Stages: 3/3)
Job 119 View(Stages: 1/1, 2 skipped)
You can see that successive Actions based on same shuffling result cause a skipping of one or more Stages for the second Action / Job for val c or val f. That is to say, the join type for c and f are known and the 2 Actions for the same join type run sequentially profiting from prior work, i.e. the second Action can rely on the shuffling of the first Action that is directly applicable to the 2nd Action. That simple.
Upvotes: 3