Reputation: 383
I recently got introduced to Spark-SQL. I read somewhere about using CLUSTER BY
on join
columns (before the join) to improve join
performance. Example:
create temporary view prod as
select id, name
from product
cluster by id;
create temporary view cust as
select cid, pid, cname
from customer
cluster by pid;
select c.id, p.name, c.name
from prod p
join cust c
on p.id = c.pid;
Can anyone please explain In which scenarios the same should be leveraged ? I understand that for join, data is shuffled. Then what benefits does CLUSTER BY
brings in, since it also shuffles the data ?
Thanks.
Upvotes: 1
Views: 3656
Reputation: 783
Spark will recognize the cluster by and shuffle the data. However, if you use the same columns in later queries that induce shuffles, Spark might re-use the exchange.
Upvotes: 1
Reputation: 18003
If you use the SQL interface you can do things without having to use the DF interface.
Cluster By
is the same as:
df.repartition($"key", n).sortWithinPartitions()
Due to lazy evaluation, Spark will see the JOIN and know that you indicate you want a repartition by key - via SQL, not like statement directly above - so it is just the interface amounting to the same thing. Makes it easier to stay in SQL mode only. You can intermix.
If you do not do it, then Spark will do it for you (in general) and apply the current shuffle partitions parameter.
SET spark.sql.shuffle.partitions = 2
SELECT * FROM df CLUSTER BY key
is the same as:
df.repartition($"key", 2).sortWithinPartitions()
spark.sql('''SELECT /*+ REPARTITION(col,..) */ cols... from table''')
UPDATE
This does not apply to a JOIN in this way:
val df = spark.sql(""" SELECT /*+ REPARTITION(30, c1) */ T1.c1, T1.c2, T2.c3
FROM T1, T2
WHERE T1.c1 = T2.c1
""")
What this does is to repartition after processing the JOIN. The JOIN will use the higher of partitioning nums set on T1 and T2, or shuffle partitions if not set explicitly.
Upvotes: 2