Raj
Raj

Reputation: 2398

parallelize() method in SparkContext

I am trying to understand the effect of giving different numSlices to the parallelize() method in SparkContext. Given below is the Syntax of the method

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)
(implicit arg0: ClassTag[T]): RDD[T]

I ran spark-shell in local mode

spark-shell --master local

My understanding is, numSlices decides the no of partitions of the resultant RDD(after calling sc.parallelize()). Consider few examples below

Case 1

scala> sc.parallelize(1 to 9, 1);
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:22
scala> res0.partitions.size
res2: Int = 1

Case 2

scala> sc.parallelize(1 to 9, 2);
res3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:22
scala> res3.partitions.size
res4: Int = 2

Case 3

scala> sc.parallelize(1 to 9, 3);
res5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:22
scala> res3.partitions.size
res6: Int = 2

Case 4

scala> sc.parallelize(1 to 9, 4);
res7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:22
scala> res3.partitions.size
res8: Int = 2

Question 1 : In case 3 & case 4, I was expecting the partition size to be 3 & 4 respectively, but both cases have partition size of only 2. What is the reason for this?

Question 2 : In each case there is a number associated with ParallelCollectionRDD[no]. ie In Case 1 it is ParallelCollectionRDD[0], In case 2 it is ParallelCollectionRDD[1] & so on. What exactly those numbers signify?

Upvotes: 18

Views: 54567

Answers (1)

Matthew Gray
Matthew Gray

Reputation: 1676

Question 1: That's a typo on your part. You're calling res3.partitions.size, instead of res5 and res7 respectively. When I do it with the correct number, it works as expected.

Question 2: That's the id of the RDD in the Spark Context, used for keeping the graph straight. See what happens when I run the same command three times:

scala> sc.parallelize(1 to 9,1)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:22

scala> sc.parallelize(1 to 9,1)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:22

scala> sc.parallelize(1 to 9,1)
res2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:22

There are now three different RDDs with three different ids. We can run the following to check:

scala> (res0.id, res1.id, res2.id)
res3: (Int, Int, Int) = (0,1,2)

Upvotes: 24

Related Questions