Reputation: 2328
I am using Spark 2.1.1. I have very complex query written in Spark SQL, which I am trying to optimise. For a section, I am trying to use broadcast join. But even though I have set:
spark.sql.autoBroadcastJoinThreshold=1073741824
which is 1GB, I see that the spark generated physical plan for this section of execution is still using SortMergeJoin. Do you have any insights why broadcast join is not used, even though one side's size is shown much lesser (in MB's) on Spark UI -> SQL tab?
My SQL code section for the affected portion looks like:
-- Preceding SQL
(
SELECT /*+ BROADCAST (a) */ -- Size of a is within broadcast threshold as per UI
a.id,
big.gid
FROM
(SELECT DISTINCT(id) AS id FROM a_par WHERE gid IS NULL) a
JOIN
big ON (a.id=big.id)
)
-- Succeeding SQL
The Spark UI screen that corroborates is below:
Upvotes: 7
Views: 4268
Reputation: 18581
To add on top of @Constantine's answer, in 2022.
As of Spark 3.0.0, Adaptive Query Execution (AQE) is supported, which will leverage table runtime statistics for choosing the most effective join strategy. In my experience it converts SortMergeJoins to BroadcastJoins very effectively.
From Spark 3.1.1 documentation:
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic.
When using Spark 3.x versions prior to 3.2.0, AQE is disabled by default and could be enabled with:
spark.sql.adaptive.enabled=true
Starting with Spark 3.2.0, AQE is enabled by default.
More details on AQE could be found on the DataBricks blog annoucement:
Adaptive Query Execution: Speeding Up Spark SQL at Runtime.
Upvotes: 1
Reputation: 2328
Below is as per my observations and the way I made it work:
In spark 2.1:
In SparkSql :
BroadcastHashJoin
only if it can compute the size of the dataframe (as per an earlier answer). In SparkShell :
So, in short, I couldn't find anyway to achieve it though only Spark SQL. I had to introduce a broadcast(df) to enforce the broadcast join.
Upvotes: 3
Reputation: 1416
Spark doesn't support adaptive execution. It doesn't change the execution plan based on the intermediate statistics(like size, max, min etc) after a stage is completed. So once the plan is generated in before query execution, its not changed. So you would see the same plan.
The reason why spark is not broadcasting the left table is because of the missing statistics of your sub query. The way I overcame this issue is by caching the results of your query. This helps spark to optimize the plan.
In your case, you can do something like :
CACHE TABLE cached_a as SELECT DISTINCT(id) AS id FROM a_par WHERE gid IS NULL;
SELECT
a.id,
big.gid
FROM
cached_a
JOIN
big ON (a.id=big.id)
Upvotes: 6