Matthew
Matthew

Reputation: 383

CLUSTER BY usage with Spark SQL queries

I recently got introduced to Spark-SQL. I read somewhere about using CLUSTER BY on join columns (before the join) to improve join performance. Example:

create temporary view prod as
select id, name
from product
cluster by id; 

create temporary view cust as
select cid, pid, cname
from customer
cluster by pid;

select c.id, p.name, c.name 
from prod p
join cust c
on p.id = c.pid;

Can anyone please explain In which scenarios the same should be leveraged ? I understand that for join, data is shuffled. Then what benefits does CLUSTER BY brings in, since it also shuffles the data ?

Thanks.

Upvotes: 1

Views: 3656

Answers (2)

marie20
marie20

Reputation: 783

Spark will recognize the cluster by and shuffle the data. However, if you use the same columns in later queries that induce shuffles, Spark might re-use the exchange.

Upvotes: 1

Ged
Ged

Reputation: 18003

If you use the SQL interface you can do things without having to use the DF interface.

Cluster By is the same as:

df.repartition($"key", n).sortWithinPartitions()

Due to lazy evaluation, Spark will see the JOIN and know that you indicate you want a repartition by key - via SQL, not like statement directly above - so it is just the interface amounting to the same thing. Makes it easier to stay in SQL mode only. You can intermix.

If you do not do it, then Spark will do it for you (in general) and apply the current shuffle partitions parameter.

SET spark.sql.shuffle.partitions = 2
SELECT * FROM df CLUSTER BY key

is the same as:

df.repartition($"key", 2).sortWithinPartitions()
spark.sql('''SELECT /*+ REPARTITION(col,..) */ cols... from table''')

UPDATE

This does not apply to a JOIN in this way:

val df = spark.sql(""" SELECT /*+ REPARTITION(30, c1) */ T1.c1, T1.c2, T2.c3
                         FROM T1, T2   
                        WHERE T1.c1 = T2.c1
                   """) 

What this does is to repartition after processing the JOIN. The JOIN will use the higher of partitioning nums set on T1 and T2, or shuffle partitions if not set explicitly.

Upvotes: 2

Related Questions