Reputation: 803
I am trying to understand Distribute by
clause and how it could be used in Spark-SQL
to optimize Sort-Merge Joins
.
As per my understanding, the Spark Sql optimizer will distribute the datasets of both the participating tables (of the join) based on the join keys (shuffle phase) to co-locate the same keys in the same partition. If that is the case, then if we use the distribute by
in the sql, then also we are doing the same thing.
So in what way can distribute by
could be used ameliorate join performance ? Or is it that it is better to use distribute by
while writing the data to disk by the load process, so that subsequent queries using this data will benefit from it by not having to shuffle it ?
Can you please explain with a real-world example to tune join using distribute by/cluster by
in Spark-SQL ?
Upvotes: 3
Views: 5201
Reputation: 3344
Let me try to answer each part of your question:
As per my understanding, the Spark Sql optimizer will distribute the datasets of both the participating tables (of the join) based on the join keys (shuffle phase) to co-locate the same keys in the same partition. If that is the case, then if we use the distribute by in the sql, then also we are doing the same thing.
Yes that is correct.
So in what way can distribute by could be used ameliorate join performance ?
Sometimes one of your tables is already distributed, for example the table was bucketed or the data was aggregated before the join by the same key. In this case if you explicitly repartition also the second table (distribute by) you will achieve the same partitioning in both branches of the join and Spark will not induce any more shuffle in the first branch (sometimes this is referenced to as one-side shuffle-free join because the shuffle will occur only in one branch of the join - the one in which you call repartition / distribute by). On the other hand if you don't repartition explicitly the other table, Spark will see that each branch of the join has different partitioning and thus it will shuffle both branches. So in some special cases calling repartition (distribute by) can save you one shuffle.
Notice that to make this work you need to achieve also the same number of partitions in both branches. So if you have two tables that you want to join on the key user_id
and if the first table is bucketed into 10 buckets with this key then you need to repartition the other table also into 10 partitions by the same key and then the join will have only one shuffle (in the physical plan you can see that there will be Exchange operator only in one brach of the join).
Or is it that it is better to use distribute by while writing the data to disk by the load process, so that subsequent queries using this data will benefit from it by not having to shuffle it ?
Well, this is actually called bucketing (cluster by) and it will allow you to pre-shuffle the data once and then each time you read the data and join it (or aggregate) by the same key by which you bucketed, it will be free of shuffle. So yes, this is very common technique that you pay the cost only once when saving the data and then leverage that each time you read it.
Upvotes: 5