voiddrum
voiddrum

Reputation: 93

Spark Dataset joinWith API giving wrong results

This is a small test case to reproduce an issue that I am seeing in a join in my code

case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
+---+---+
| b1| b2|
+---+---+
|One|  1|
|Two|  2|
+---+---+
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget",  (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
+---+---+
| b1| b2|
+---+---+
|One|Van|
+---+---+
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+----------+--------+
|        _1|      _2|
+----------+--------+
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
+----------+--------+

The joinWith result is not right. It is essentially doing a cross product. Any clue what the problem is? I have verified that join API works fine.

val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+---+---+---+---+
| b1| b2| b1| b2|
+---+---+---+---+
|One|Van|One|  1|
+---+---+---+---+

Upvotes: 0

Views: 374

Answers (1)

Grisha Weintraub
Grisha Weintraub

Reputation: 7986

Looks like you use a pretty old Spark version. On Spark 2.4.4, I get the following exception when running your example:

org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.

The reason is that the join condition in fact compares dsB("b1") to itself and that is always true.

A trivial solution would be to rename the column. Something like that:

val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
+----------+--------+
|        _1|      _2|
+----------+--------+
|[One, Van]|[One, 1]|
+----------+--------+

Upvotes: 4

Related Questions