jka.ne
jka.ne

Reputation: 1771

spark join performance: multiple column vs a single column

If I have columns [a,b,c] in df1 and [a,b,c] in df2, and also a column d, in both where d=concat_ws('_', *[a,b,c]) would there be a performance difference between:

  1. df1.join(df2, [a,b,c])
  2. df1.join(df2, d)

?

Upvotes: 1

Views: 3064

Answers (2)

Andrew Long
Andrew Long

Reputation: 933

I'd suspect join without the concatenate to be faster because its likely cheaper to just hash the individual strings instead of concatenate and then hash. The former involves fewer java objects that need to be GC'd, but this isn't the full answer.

Keep in ming that this may not be the performance limiting step of your query and so either way would be just as fast. When it comes to performance tuning its best to test rather than guessing without data.

Also as mentioned above, leaving the columns unconcatenated gives the optimizer a chance to eliminate an exchange on the join if the input data is already partitioned correctly.

df1.join(df2, [a,b,c])
df1.join(df2, d)

Upvotes: 1

werner
werner

Reputation: 14845

The question cannot be answered with yes or no as the answer depends on the details of the DataFrames.

The performance of a join depends to some good part on the question how much shuffling is necessary to execute it. If both sides of the join are partitioned by the same column(s) the join will be faster. You can see the effect of partitioning by looking at the execution plan of the join.

We create two DataFrames df1 and df2 with the columns a, b, c and d:

val sparkSession = ...
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
import sparkSession.implicits._
val cols = Seq("a","b","c")
def createDf = (1 to 3).map(i => (i,i,i)).toDF(cols:_*).withColumn("d", concat_ws("_", cols.map(col):_*))
val df1 = createDf
val df2 = createDf

df1 and df2 look both the same:

+---+---+---+-----+
|  a|  b|  c|    d|
+---+---+---+-----+
|  1|  1|  1|1_1_1|
|  2|  2|  2|2_2_2|
|  3|  3|  3|3_3_3|
+---+---+---+-----+

When we partition both DataFrames by column d and use this column as join condition

df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), "d").explain()

we get the execution plan

== Physical Plan ==
*(3) Project [d#13, a#7, b#8, c#9, a#25, b#26, c#27]
+- *(3) SortMergeJoin [d#13], [d#31], Inner
   :- *(1) Sort [d#13 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(d#13, 4)
   :     +- LocalTableScan [a#7, b#8, c#9, d#13]
   +- *(2) Sort [d#31 ASC NULLS FIRST], false, 0
      +- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(d#13, 4)

Partitioning both DataFrames by d but joining over a, b and c

df1.repartition(4, col("d")).join(df2.repartition(4, col("d")), cols).explain()

leads to the execution plan

== Physical Plan ==
*(3) Project [a#7, b#8, c#9, d#13, d#31]
+- *(3) SortMergeJoin [a#7, b#8, c#9], [a#25, b#26, c#27], Inner
   :- *(1) Sort [a#7 ASC NULLS FIRST, b#8 ASC NULLS FIRST, c#9 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#7, b#8, c#9, 200)
   :     +- Exchange hashpartitioning(d#13, 4)
   :        +- LocalTableScan [a#7, b#8, c#9, d#13]
   +- *(2) Sort [a#25 ASC NULLS FIRST, b#26 ASC NULLS FIRST, c#27 ASC NULLS FIRST], false, 0
      +- ReusedExchange [a#25, b#26, c#27, d#31], Exchange hashpartitioning(a#7, b#8, c#9, 200)

which contains one Exchange hashpartitioning more than the first plan. In this case the join by a, b, c would be slower.

On the other side, if the DataFrames are partitioned by a, b and c the join by a, b, c would be faster than a join by d.

Upvotes: 1

Related Questions