Yann Moisan
Yann Moisan

Reputation: 8281

Why does Spark perform an unnecessary shuffle during a joinWith on a pre-partitioned dataframe?

This example has been tested with Spark 2.4.x. Let's consider 2 simple dataframes:

case class Event(user_id: String, v: String)
case class User(user_id: String, name: String)
val events = spark.createDataFrame(Seq(Event("u1", "u1.1"),Event("u1", "u1.2"),Event("u2", "u2.1")))
val users = spark.createDataFrame(Seq(User("u1", "name1"),User("u2", "name2"),User("u3", "name3")))

When there is a groupBy followed by a joinWith on the same column, Spark performs an unecessary shuffle:

events
  .groupBy("user_id").agg(collect_list(struct("v")).alias("values"))
  .joinWith(users, events("user_id") === users("user_id"), "full_outer")
  .show

When doing the same thing with a join, the extra shuffle disappears:

events
  .groupBy("user_id").agg(collect_list(struct("v")).alias("values"))
  .join(users, events("user_id") === users("user_id"), "full_outer")
  .show

Is it a known issue? Is there a workaround to use joinWith without the extra shuffle?

Appendix : here are the SQL plans

enter image description here

enter image description here

Upvotes: 3

Views: 104

Answers (1)

BlueSheepToken
BlueSheepToken

Reputation: 6099

This seems like a bug introduced by a bug fix in this ticket. The result was wrong for outer joins. Hence the need to add a Project node (packing of the struct) before the Join node.

However, we end up with this kind of query plan:

Join FullOuter, (_1#122.user_id = _2#123.user_id)
:- Project [named_struct(user_id, user_id#4, values, values#115) AS _1#122]
:  +- Aggregate [user_id#4], [user_id#4, collect_list(struct(v, v#5), 0, 0) AS values#115]
:     +- LocalRelation [user_id#4, v#5]
+- LocalRelation [_2#123]

The Project node prevents catalyst from optimizing the extra shuffle.

However, with this PR the Project node has been removed for inner joins and so the extra shuffle.

Upvotes: 2

Related Questions