Reputation: 783
I am new to Spark-SQL
to read Hive
tables. I want to know that how does Spark performs a multi-table Join
. I read somewhere that it is recommended to always keep the largest table on the top of the Join order and so on, which is conducive for Join
efficiency. I read that in a Join, Spark loads the first table (largest) in the order to Memory and streams the other table which aids in Join performance. However, I am confused as to how this strategy will boost performance since the largest table (in most cases) will not fit in memory and spill on disk.
Can anyone please clarify and explain the Joining mechanism employed by Spark under the hood when joining
[large
vs medium
], [large
vs small
] and [large
vs large
] tables in terms of Join types (inner
& outer
) and Join performance. I want to know the best practices that should be followed in terms of Join table ordering to achieve optimal performance for all the join strategies (SMJ, ShuffleHash & Broadcast) employed by Spark. Let's assume the following query:
select
a.id,
b.cust_nm,
c.prod_nm
from large_tab a
join medium_tab b
on a.id = b.id
join small_tab c
on a.pid = c.pid;
Note: We use Spark 2.4
Any help is much appreciated. Thanks.
Upvotes: 6
Views: 6016
Reputation: 3344
Regarding the order of the joins, Spark provides the functionality to find the optimal configuration (order) of the tables in the join, but it is related to some configuration settings (the bellow code is provided in PySpark API):
spark.conf.set("spark.sql.cbo.enabled", True)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)
spark.sql("ANALYZE TABLE table_name COMPUTE STATISTICS FRO COLUMNS col1, col2, ...")
Having computed the statistics is quite important here because based on that Spark will estimate the size of the tables in the join and will reorder them accordingly. To have even better estimates, you can also enable the histogram computation for the columns (this is also off by default in 2.4):
spark.conf.set("spark.sql.statistics.histogram.enabled", True)
The maximum number of tables for which this joinReorder
can be used can be controlled by this setting
spark.sql.cbo.joinReorder.dp.threshold
and the default value is 12.
Upvotes: 6