Reputation: 29
I have 2 df's
df1:
df2:
Now I want to join the df1, df2 on (df1.col1=df2.col1 and df1.col2=df2.col2) without much shuffles
tried to join but taking a lot of time...
How do i do it.. Can any one help..?
Upvotes: 2
Views: 1374
Reputation: 3733
Imo you can try to use broadcast join if one of your dataset is small (lets say few hundrests of mb) - in this case smaller dataset will be broadcasted and you will skip the shuffle
Without broadcast hint catalyst is probably going to pick SMJ(sort-merge join) and during this join algorithm data needs to be repartitioned by join key and then sorted. I prepared quick example
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", "10")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val data = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 7), ("test55", 86))
val data2 = Seq(("test", 3),("test", 3), ("test2", 5), ("test3", 6), ("test33", 76))
val df = data.toDF("Name", "Value").repartition(5, col("Name"))
df.show
val df2 = data2.toDF("Name", "Value").repartition(5, col("Name"))
df2.show
df.join(df2, Seq("Name", "Value")).show
autoBroadcastJoinThreshold is set to -1 to disable broadcastJoin
sql.shuffle.partitions is set to 10 to show that join is going to use this value during repartition
i repartitioned dfs before join with 5 partitions and called action to be sure that they are paritioned by the same column before join
And in sql tab i can see that Spark is repartitioning data again
If you cant broadcast and your join is taking a lot of time you may check if you have some skew.
You may read this blogpost by Dima Statz to find more informations about skew on joins
Upvotes: 2