Reputation: 727
I am trying to effectively join two DataFrames, one of which is large and the second is a bit smaller.
Is there a way to avoid all this shuffling? I cannot set autoBroadCastJoinThreshold
, because it supports only Integers - and the table I am trying to broadcast is slightly bigger than integer number of bytes.
Is there a way to force broadcast ignoring this variable?
Upvotes: 51
Views: 114828
Reputation: 29195
###Broadcast Hash Joins (similar to map side join or map-side combine in Mapreduce) :
In SparkSQL you can see the type of join being performed by calling queryExecution.executedPlan
. As with core Spark, if one of the tables is much smaller than the other you may want a broadcast hash join. You can hint to Spark SQL that a given DF should be broadcast for join by calling method broadcast
on the DataFrame
before joining it
Example:
import org.apache.spark.sql.functions.broadcast
// Example: Broadcasting a small dimension table for an efficient join with a large fact table
largedataframe.join(broadcast(smalldataframe), "key")
in DWH terms, where largedataframe may be like fact
smalldataframe may be like dimension
As described by my fav book (HPS) pls. see below to have better understanding..
Note : Above broadcast
is from import org.apache.spark.sql.functions.broadcast
not from SparkContext
Spark also, automatically uses the spark.sql.conf.autoBroadcastJoinThreshold
to determine if a table should be broadcast.
###Tip : see DataFrame.explain() method
def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
Is there a way to force broadcast ignoring this variable?
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
NOTE :
Another similar out of box note w.r.t. Hive (not spark) : Similar thing can be achieved using hive hint
MAPJOIN
like below...
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
Further Reading : Please refer my article on BHJ, SHJ, SMJ
Upvotes: 97
Reputation: 18505
Using join hints will take precedence over the configuration autoBroadCastJoinThreshold
, so using a hint will always ignore that threshold.
In addition, when using a join hint the Adaptive Query Execution (since Spark 3.x) will also not change the strategy given in the hint.
In Spark SQL you can apply join hints as shown below:
SELECT /*+ BROADCAST */ a.id, a.value FROM a JOIN b ON a.id = b.id
SELECT /*+ BROADCASTJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id
SELECT /*+ MAPJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id
Note, that the key words BROADCAST, BROADCASTJOIN and MAPJOIN are all aliases as written in the code in hints.scala.
Upvotes: 1
Reputation: 513
I found this code works for Broadcast Join in Spark 2.11 version 2.0.0.
import org.apache.spark.sql.functions.broadcast
val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF
// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))
import context.implicits._
employeesDF.join(broadcast(tmpDepartments),
$"depId" === $"id", // join by employees.depID == departments.id
"inner").show()
Here is the reference for the above code Henning Kropp Blog, Broadcast Join with Spark
Upvotes: 1
Reputation: 71
Setting spark.sql.autoBroadcastJoinThreshold = -1
will disable broadcast completely. See
Other Configuration Options in Spark SQL, DataFrames and Datasets Guide.
Upvotes: 7
Reputation: 8008
You can hint for a dataframe to be broadcasted by using left.join(broadcast(right), ...)
Upvotes: 23
Reputation: 2747
This is a current limitation of spark, see SPARK-6235. The 2GB limit also applies for broadcast variables.
Are you sure there is no other good way to do this, e.g. different partitioning?
Otherwise you can hack your way around it by manually creating multiple broadcast variables which are each <2GB.
Upvotes: 3