Reputation: 450
I remember from working with RDDs, that if one key-value RDD (rdd1) has a known partitioning, then performing a join with a different, unpartitioned, key-value RDD (rdd2) would give performance benefits. This is because 1) only the data of rdd2 would need to be transferred across the network, and 2) each element of rdd2 would only need to be transferred to one node rather than all, by applying the partitioning of the key of rdd1 to the key of rdd2
I'm learning about Shuffle Sort Merge Joins with DataFrames. The example in the book I am reading (Learning Spark, 2nd Edition) is for joining two DataFrames based on user_id columns. The example is attempting to demonstrate the elimination of the Exchange stage from the join operation, so, prior to the join, both DataFrames are bucketed into an equal number of buckets by the column to be joined on.
My question is, what happens if only one of the DataFrames has been bucketed? Clearly the Exchange stage will reappear. But if we know that DataFrame1 is bucketed into N buckets by the column we want to join on, will Spark use this bucketing information to efficiently transfer the rows of DataFrame2 over the network, as in the RDD case? Would Spark leave the rows of DataFrame1 where they are, and just apply an identical bucketing to DataFrame2? (Assuming that N buckets results in a reasonable amount of data in the partitions to be joined by the executors) Or instead, does Spark inefficiently shuffle both DataFrames?
In particular, I can imagine a situation where I have a single 'master' DataFrame against which I will need to perform many independent joins with other supplemental DataFrames on the same column. Surely it should only be necessary to pre-bucket the master DataFrame in order to see the performance benefits for all joins? (Although taking the trouble to bucket the supplemental DataFrames wouldn't hurt either, I think)
Upvotes: 0
Views: 1608
Reputation: 18003
https://kb.databricks.com/data/bucketing.html This explains it all with some embellishment over their original postings which I summarize.
Bottom line:
val t1 = spark.table("unbucketed")
val t2 = spark.table("bucketed")
val t3 = spark.table("bucketed")
Unbucketed - bucketed join. Both sides need to be repartitioned.
t1.join(t2, Seq("key")).explain()
Unbucketed with repartition - bucketed join. Unbucketed side is correctly repartitioned, and only one shuffle is needed.
t1.repartition(16, $"key").join(t2, Seq("key")).explain()
Unbucketed with incorrect repartitiong (default(200) - bucketed join. Unbucketed side is incorrectly repartitioned, and two shuffles are needed.
t1.repartition($"key").join(t2, Seq("key")).explain()
bucketed - bucketed join. Ideal case, both sides have the same bucketing, and no shuffles are needed.
t3.join(t2, Seq("key")).explain()
So, both sides need same bucketing for optimal performance.
Upvotes: 1