Reputation: 18003
Even though RDDs are legacy - and a bit of time on hand before next assignment - I am wondering:
Why can [reduceByKey][1]
can be called with numPartitions
?
Given that it works similar to combineByKey
in Map Reduce, what is the point really?
Not so obvious to me. In real situations I suspect that it is not used often, one has already re-partitioned in general (based on my own observations).
Upvotes: 1
Views: 1399
Reputation: 9417
I think one major difference you can easily observe is that a reduceByKey(func, numPartitions)
will be executed in single stage, while repartition(numPartitions)
followed by reduceByKey(func)
requires obviously two.
Lets check:
scala> val rdd = sc.parallelize(Array(("a", 1),("b", 1),("c",1),("a", 1),("b", 1),("c", 1),("a", 1),("b", 1),("c", 1)),3)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> def rbk(sumbykey:Int, i:Int) = sumbykey + i
rbk: (sumbykey: Int, i: Int)Int
scala> var res = rdd.reduceByKey(rbk(_,_),6)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:27
scala> res.toDebugString
res8: String =
(6) ShuffledRDD[8] at reduceByKey at <console>:27 []
+-(3) ParallelCollectionRDD[0] at parallelize at <console>:24 []
scala> res = rdd.repartition(6).reduceByKey(rbk)
res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:29
scala> res.toDebugString
res9: String =
(6) ShuffledRDD[13] at reduceByKey at <console>:29 []
+-(6) MapPartitionsRDD[12] at repartition at <console>:29 []
| CoalescedRDD[11] at repartition at <console>:29 []
| ShuffledRDD[10] at repartition at <console>:29 []
+-(3) MapPartitionsRDD[9] at repartition at <console>:29 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
So if you plan to increase parallelism of a reduce operation, the reduceByKey(func, numPartitions)
version seems to have an advantage.
Upvotes: 2
Reputation: 8325
According to RDD Programming Guide
reduceByKey(func, [numPartitions]): When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
So the second parameter controls the amount of parallelisation, similar to groupByKey
:
groupByKey([numPartitions]): When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
Quoting from How is the number of RDD partitions decided in Apache Spark? | Server Fault:
How is this number determined? The way Spark groups RDDs into stages is described in the previous post. (As a quick reminder, transformations like repartition and reduceByKey induce stage boundaries.) The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.
after question update
from: Optimizing Spark jobs for maximum performance (emphasis mine)
[..]Partitioning characteristics frequently change on shuffle boundaries. Operations that imply a shuffle therefore provide a numPartitions parameter that specify the new partition count (by default the partition count stays the same as in the original RDD).
Since Spark (RDD API) does no optimisation on its own (unlike SQL which many new programmers to Spark come from), it is imperative for the programmer to optimise the flow and parallelism of operations by herself (from same post).
[..]This might possibly stem from many users’ familiarity with SQL querying languages and their reliance on query optimizations. It is important to realize that the RDD API doesn’t apply any such optimizations.
Furthermore, repartitioning
is usualy used but is typicaly costly and in various cases could be avoided or merged with other operations to improve overall performance (see linked post)
Upvotes: 2