Reputation: 1491
I want to use BROADCAST hint on multiple small tables while joining with a large table. In the example below SMALLTABLE2 is joined multiple times with the LARGETABLE on different joining columns. Now to get the better performance I want both SMALLTABLE1 and SMALLTABLE2 to be BROADCASTED. Can this be achieved by simply adding the hint /* BROADCAST (B,C,D,E) */ or there is a better solution? SMALLTABLE1 & SMALLTABLE2 I am getting the data by querying HIVE tables in a Dataframe and then using createOrReplaceTempView to create a view as SMALLTABLE1 & SMALLTABLE2; which is later used in the query like below.
Is there anyway BROADCASTING view created using createOrReplaceTempView function?
SELECT A.COL1, A.COL2, A.COL3, B.COL4, C.COL5, D.COL6, E.COL7
FROM LARGETABLE A
JOIN SMALLTABLE1 B
ON A.LCOL = B.SCOL
JOIN SMALLTABLE2 C
ON A.LCOL1 = C.SCOL
JOIN SMALLTABLE2 D
ON A.LCOL2 = D.SCOL
JOIN SMALLTABLE2 E
ON A.LCOL3 = E.SCOL
Upvotes: 1
Views: 5491
Reputation: 31490
If you are using spark 2.2+ then you can use any of these MAPJOIN/BROADCAST/BROADCASTJOIN
hints.
Refer to this Jira and this for more details regarding this functionality.
Example: below i have used broadcast but you can use either mapjoin/broadcastjoin hints will result same explain plan.
>>> spark.range(1000000000).createOrReplaceTempView("t")
>>> spark.range(1000000000).createOrReplaceTempView("u")
>>>sql("select /*+ Broadcast(t,u) */* from t join u on t.id=u.id").explain()
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#16L], Inner, BuildRight
:- *Range (0, 1000000000, step=1, splits=56)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *Range (0, 1000000000, step=1, splits=56)
(or)
if you are using Spark < 2 then we need to use dataframe API to persist then registering as temp table we can achieve in memory join.
>>> df=hc.range(10000000)
>>> df.persist() --persist the df in memory
>>> df.registerTempTable("u") --register temp table
>>> df1=hc.range(10000000)
>>> df1.persist()
>>> df1.registerTempTable("t")
>>> hc.sql("select * from t join u on t.id=u.id").explain()
== Physical Plan ==
Project [id#11L,id#26L]
+- SortMergeJoin [id#11L], [id#26L]
:- Sort [id#11L ASC], false, 0
: +- TungstenExchange hashpartitioning(id#11L,200), None
: +- InMemoryColumnarTableScan [id#11L], InMemoryRelation [id#11L], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
+- Sort [id#26L ASC], false, 0
+- TungstenExchange hashpartitioning(id#26L,200), None
+- InMemoryColumnarTableScan [id#26L], InMemoryRelation [id#26L], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
By using DataFrames without creating any temp tables
>>>from pyspark.sql.functions import *
>>> df=hc.range(10000000)
>>> df1=hc.range(10000000)
>>> df.join(broadcast(df1),['id']).explain()
== Physical Plan ==
Project [id#26L]
+- BroadcastHashJoin [id#26L], [id#11L], BuildRight
:- ConvertToUnsafe
: +- Scan ExistingRDD[id#26L]
+- ConvertToUnsafe
+- Scan ExistingRDD[id#11L]
in addition Broadcast joins are done automatically in Spark.
There is a parameter is "spark.sql.autoBroadcastJoinThreshold" which is set to 10mb by default.
To change the default value then
conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*<mb_value>)
for more info refer to this link regards to spark.sql.autoBroadcastJoinThreshold.
Upvotes: 2