Rakesh Reddy
Rakesh Reddy

Reputation: 29

How to join 2 dataframes in spark which are already partitioned with same column without shuffles..?

I have 2 df's

df1:

df2:

Now I want to join the df1, df2 on (df1.col1=df2.col1 and df1.col2=df2.col2) without much shuffles

tried to join but taking a lot of time...

How do i do it.. Can any one help..?

Upvotes: 2

Views: 1374

Answers (1)

M_S
M_S

Reputation: 3733

Imo you can try to use broadcast join if one of your dataset is small (lets say few hundrests of mb) - in this case smaller dataset will be broadcasted and you will skip the shuffle

Without broadcast hint catalyst is probably going to pick SMJ(sort-merge join) and during this join algorithm data needs to be repartitioned by join key and then sorted. I prepared quick example

import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", "10")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val data = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 7), ("test55", 86))
val data2 = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 6), ("test33", 76))

val df = data.toDF("Name", "Value").repartition(5, col("Name"))
df.show
val df2 = data2.toDF("Name", "Value").repartition(5, col("Name"))
df2.show

df.join(df2, Seq("Name", "Value")).show

autoBroadcastJoinThreshold is set to -1 to disable broadcastJoin

sql.shuffle.partitions is set to 10 to show that join is going to use this value during repartition

i repartitioned dfs before join with 5 partitions and called action to be sure that they are paritioned by the same column before join

And in sql tab i can see that Spark is repartitioning data again

enter image description here

If you cant broadcast and your join is taking a lot of time you may check if you have some skew.

You may read this blogpost by Dima Statz to find more informations about skew on joins

Upvotes: 2

Related Questions