Reputation: 8281
This example has been tested with Spark 2.4.x. Let's consider 2 simple dataframes:
case class Event(user_id: String, v: String)
case class User(user_id: String, name: String)
val events = spark.createDataFrame(Seq(Event("u1", "u1.1"),Event("u1", "u1.2"),Event("u2", "u2.1")))
val users = spark.createDataFrame(Seq(User("u1", "name1"),User("u2", "name2"),User("u3", "name3")))
When there is a groupBy
followed by a joinWith
on the same column, Spark performs an unecessary shuffle:
events
.groupBy("user_id").agg(collect_list(struct("v")).alias("values"))
.joinWith(users, events("user_id") === users("user_id"), "full_outer")
.show
When doing the same thing with a join, the extra shuffle disappears:
events
.groupBy("user_id").agg(collect_list(struct("v")).alias("values"))
.join(users, events("user_id") === users("user_id"), "full_outer")
.show
Is it a known issue?
Is there a workaround to use joinWith
without the extra shuffle?
joinWith
join
Upvotes: 3
Views: 104
Reputation: 6099
This seems like a bug introduced by a bug fix in this ticket. The result was wrong for outer joins
.
Hence the need to add a Project
node (packing of the struct) before the Join
node.
However, we end up with this kind of query plan:
Join FullOuter, (_1#122.user_id = _2#123.user_id)
:- Project [named_struct(user_id, user_id#4, values, values#115) AS _1#122]
: +- Aggregate [user_id#4], [user_id#4, collect_list(struct(v, v#5), 0, 0) AS values#115]
: +- LocalRelation [user_id#4, v#5]
+- LocalRelation [_2#123]
The Project
node prevents catalyst from optimizing the extra shuffle.
However, with this PR the Project
node has been removed for inner joins and so the extra shuffle.
Upvotes: 2