Reputation: 483
So if i have a transformation before :
myRDD = someRDD.map()
mySecondRDD = myRDD.aggregateByKey(initValue)(CombOp , MergeOp)
In this point myRDD doesn't have a partitioner, but mySecondRDD has one hashPartitioner. Firstly i want to ask:
1)Do i have to designate a partitioner in myRDD? And If i do how is it possible to pass it as an argument in aggregateByKey?
*Note that myRDD is a transformation and hasn't a partitioner
2)Shouldn't at the end of these two commands myRDD have the same partitioner as mySecondRDD instead of none?
3) How many shuffles these 2 commands will do?
4)If i designate a partitioner with partitionBy in myRDD, and manage to pass it as an argument in aggregateByKey will i have reduced the shuffles to 1 instead of 2?
I am sorry i still don't quite get it how it works.
Upvotes: 1
Views: 441
Reputation: 966
I will try to answer your questions:
You don't have to assign a partitioner explicitly. In the code provided by you, Spark will assign it automatically. If an RDD does not have a partitioner, a default HashPartitioner is used. Take a look here for more details. To specify your own partitioner, you should use another version of aggregateByKey()
, which accepts a partitioner alongside with the initial value. It will look like myRdd.aggregateByKey(initialValue, partitioner)(CombOp, MergeOp)
.
Your mySecondRDD
will use the partitioner from myRDD
if myRDD
already has one and you do not specify a new partitioner in the aggregateByKey()
explicitly.
You will have only 1 shuffle operation since the map()
transformation will not trigger it. On the contrary, the aggregateByKey()
will need to locate records with the same key on one machine.
You will have only one shuffle even if you leave the code as it is.
Upvotes: 1