Reputation: 27373
I have an issue with a join
in Spark 2.1. Spark (wrongly?) chooses a broadcast-hash join
although the table is very large (14 million rows). The job then crashes because there is not enough memory and Spark somehow tries to persist the broadcast pieces to disk, which then lead to a timeout.
So, I know there is a query hint to force a broadcast-join (org.apache.spark.sql.functions.broadcast
), but is there also a way to force another join algorithm?
I solved my issue by setting spark.sql.autoBroadcastJoinThreshold=0
, but I would prefer another solution which is more granular, i.e. not disable the broadcast join globally.
Upvotes: 10
Views: 11207
Reputation: 1152
As of spark 3 you can leverage join hint
Either programmatically:
// shuffle hash
df1.join(df2.hint("shuffle_hash"), df1("id") === df2("id"), "left")
// sort merge
df1.join(df2.hint("shuffle_merge"), df1("id") === df2("id"), "left")
Also you can use SQL hint syntax:
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
Upvotes: 2
Reputation: 13
If you have not-inner non-equi join, broadcast nested loop join can be the only possibilty for Spark to perform a join. According to source code comments from Spark https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L311
// If it is not an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
// hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
// choose the left side for right join, and choose right side for left join.
// 2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
// side is broadcast-able and it's left join, or only right side is broadcast-able and
// it's right join, we skip this rule. If both sides are small, broadcasts the smaller
// side for inner and full joins, broadcasts the left side for right join, and broadcasts
// right side for left join.
// 2. Pick cartesian product if join type is inner like.
// 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
// left side for right join, and broadcasts right side for left join.
Also, some circumstances, it might be the case for an equi-join too. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L311
// If it is an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
// have the broadcast hints, choose the smaller side (based on stats) to broadcast.
// 2. sort merge hint: pick sort merge join if join keys are sortable.
// 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
// sides have the shuffle hash hints, choose the smaller side (based on stats) as the
// build side.
// 4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
// is supported. If both sides are small, choose the smaller side (based on stats)
// to broadcast.
// 2. Pick shuffle hash join if one side is small enough to build local hash map, and is
// much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
// 3. Pick sort merge join if the join keys are sortable.
// 4. Pick cartesian product if join type is inner like.
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
So, in some cases Spark can use broadcast joins automatically even thought it is disabled and not hinted, as broadcast nested loop join used as fallback join type, when other types are not suitable.
Upvotes: 1
Reputation: 51
Along with setting spark.sql.autoBroadcastJoinThreshold to 0 or to a negative value as per Jacek's response, check the state of 'spark.sql.join.preferSortMergeJoin'
Hint for Sort Merge join : Set the above conf to true
Hint for Shuffled Hash join: Set the above conf to false.
Upvotes: 4
Reputation: 74619
If a broadcast hash join can be used (by the broadcast hint or by total size of a relation), Spark SQL chooses it over other joins (see JoinSelection execution planning strategy).
With that said, don't force a broadcast hash join (using broadcast
standard function on the left or right join side) or disable the preference for a broadcast hash join using spark.sql.autoBroadcastJoinThreshold
to be 0
or negative.
Upvotes: 5