sujit
sujit

Reputation: 2328

Why spark (sql) is not doing broadcast join even when size under autoBroadcastJoinThreshold?

I am using Spark 2.1.1. I have very complex query written in Spark SQL, which I am trying to optimise. For a section, I am trying to use broadcast join. But even though I have set:

spark.sql.autoBroadcastJoinThreshold=1073741824

which is 1GB, I see that the spark generated physical plan for this section of execution is still using SortMergeJoin. Do you have any insights why broadcast join is not used, even though one side's size is shown much lesser (in MB's) on Spark UI -> SQL tab?
My SQL code section for the affected portion looks like:

-- Preceding SQL
(
SELECT  /*+ BROADCAST (a) */   -- Size of a is within broadcast threshold as per UI
     a.id,
     big.gid
 FROM
     (SELECT DISTINCT(id) AS id FROM a_par WHERE gid IS NULL) a
 JOIN
    big ON (a.id=big.id)
)
-- Succeeding SQL

The Spark UI screen that corroborates is below:

Upvotes: 7

Views: 4268

Answers (3)

valiano
valiano

Reputation: 18581

To add on top of @Constantine's answer, in 2022.

As of Spark 3.0.0, Adaptive Query Execution (AQE) is supported, which will leverage table runtime statistics for choosing the most effective join strategy. In my experience it converts SortMergeJoins to BroadcastJoins very effectively.

From Spark 3.1.1 documentation:

AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic.

When using Spark 3.x versions prior to 3.2.0, AQE is disabled by default and could be enabled with:

spark.sql.adaptive.enabled=true

Starting with Spark 3.2.0, AQE is enabled by default.

More details on AQE could be found on the DataBricks blog annoucement:
Adaptive Query Execution: Speeding Up Spark SQL at Runtime.

Upvotes: 1

sujit
sujit

Reputation: 2328

Below is as per my observations and the way I made it work:

In spark 2.1:

  • In SparkSql :

    • Broadcast hint is of NO use
    • Spark applies BroadcastHashJoin only if it can compute the size of the dataframe (as per an earlier answer).
    • This will happen only if one of the Join side is a bare table(in my case, hive table)
  • In SparkShell :

    • We can enforce a dataframe to be broadcasted using broadcast(df)
    • If DF is not small enough as per thresholds set, it will fail the whole job.

So, in short, I couldn't find anyway to achieve it though only Spark SQL. I had to introduce a broadcast(df) to enforce the broadcast join.

Upvotes: 3

Constantine
Constantine

Reputation: 1416

Spark doesn't support adaptive execution. It doesn't change the execution plan based on the intermediate statistics(like size, max, min etc) after a stage is completed. So once the plan is generated in before query execution, its not changed. So you would see the same plan.

The reason why spark is not broadcasting the left table is because of the missing statistics of your sub query. The way I overcame this issue is by caching the results of your query. This helps spark to optimize the plan.

In your case, you can do something like :

CACHE TABLE cached_a as SELECT DISTINCT(id) AS id FROM a_par WHERE gid IS NULL;

SELECT
     a.id,
     big.gid
 FROM
      cached_a
 JOIN
    big ON (a.id=big.id)

Upvotes: 6

Related Questions