Reputation: 1771
If I have columns [a,b,c]
in df1
and [a,b,c]
in df2
, and also a column d
, in both where d=concat_ws('_', *[a,b,c])
would there be a performance difference between:
df1.join(df2, [a,b,c])
df1.join(df2, d)
?
Upvotes: 1
Views: 3064
Reputation: 933
I'd suspect join without the concatenate to be faster because its likely cheaper to just hash the individual strings instead of concatenate and then hash. The former involves fewer java objects that need to be GC'd, but this isn't the full answer.
Keep in ming that this may not be the performance limiting step of your query and so either way would be just as fast. When it comes to performance tuning its best to test rather than guessing without data.
Also as mentioned above, leaving the columns unconcatenated gives the optimizer a chance to eliminate an exchange on the join if the input data is already partitioned correctly.
df1.join(df2, [a,b,c])
df1.join(df2, d)
Upvotes: 1
Reputation: 14845
The question cannot be answered with yes or no as the answer depends on the details of the DataFrames.
The performance of a join depends to some good part on the question how much shuffling is necessary to execute it. If both sides of the join are partitioned by the same column(s) the join will be faster. You can see the effect of partitioning by looking at the execution plan of the join.
We create two DataFrames df1
and df2
with the columns a
, b
, c
and d
:
val sparkSession = ...
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import sparkSession.implicits._
val cols = Seq("a","b","c")
def createDf = (1 to 3).map(i => (i,i,i)).toDF(cols:_*).withColumn("d", concat_ws("_", cols.map(col):_*))
val df1 = createDf
val df2 = createDf
df1
and df2
look both the same:
+---+---+---+-----+
| a| b| c| d|
+---+---+---+-----+
| 1| 1| 1|1_1_1|
| 2| 2| 2|2_2_2|
| 3| 3| 3|3_3_3|
+---+---+---+-----+
When we partition both DataFrames by column d
and use this column as join condition
df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), "d").explain()
we get the execution plan
== Physical Plan ==
*(3) Project [d#13, a#7, b#8, c#9, a#25, b#26, c#27]
+- *(3) SortMergeJoin [d#13], [d#31], Inner
:- *(1) Sort [d#13 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(d#13, 4)
: +- LocalTableScan [a#7, b#8, c#9, d#13]
+- *(2) Sort [d#31 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(d#13, 4)
Partitioning both DataFrames by d
but joining over a
, b
and c
df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), cols).explain()
leads to the execution plan
== Physical Plan ==
*(3) Project [a#7, b#8, c#9, d#13, d#31]
+- *(3) SortMergeJoin [a#7, b#8, c#9], [a#25, b#26, c#27], Inner
:- *(1) Sort [a#7 ASC NULLS FIRST, b#8 ASC NULLS FIRST, c#9 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#7, b#8, c#9, 200)
: +- Exchange hashpartitioning(d#13, 4)
: +- LocalTableScan [a#7, b#8, c#9, d#13]
+- *(2) Sort [a#25 ASC NULLS FIRST, b#26 ASC NULLS FIRST, c#27 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(a#7, b#8, c#9, 200)
which contains one Exchange hashpartitioning
more than the first plan. In this case the join by a
, b
, c
would be slower.
On the other side, if the DataFrames are partitioned by a
, b
and c
the join by a
, b
, c
would be faster than a join by d
.
Upvotes: 1