Reputation: 93
This is a small test case to reproduce an issue that I am seeing in a join in my code
case class B(val b1:String, val b2: Int)
val B1 = new B("One",1)
val B2 = new B("Two",2)
val dsB = spark.createDataset(Seq(B1, B2))
dsB.show()
+---+---+
| b1| b2|
+---+---+
|One| 1|
|Two| 2|
+---+---+
val m = Map(1->"Van")
val mapget = spark.udf.register("mapget", (b: Int) => m.get(b))
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null")
dsB1.show()
+---+---+
| b1| b2|
+---+---+
|One|Van|
+---+---+
val j = dsB1.joinWith(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+----------+--------+
| _1| _2|
+----------+--------+
|[One, Van]|[One, 1]|
|[One, Van]|[Two, 2]|
+----------+--------+
The joinWith
result is not right. It is essentially doing a cross product. Any clue what the problem is? I have verified that join
API works fine.
val j = dsB1.join(dsB, dsB1("b1") === dsB("b1"), "inner")
j.show()
+---+---+---+---+
| b1| b2| b1| b2|
+---+---+---+---+
|One|Van|One| 1|
+---+---+---+---+
Upvotes: 0
Views: 374
Reputation: 7986
Looks like you use a pretty old Spark version. On Spark 2.4.4, I get the following exception when running your example:
org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans
LocalRelation [_1#55]
and
LocalRelation [_2#56]
Join condition is missing or trivial.
The reason is that the join condition in fact compares dsB("b1")
to itself and that is always true.
A trivial solution would be to rename the column. Something like that:
val dsB1 = dsB.withColumn("b2", mapget(dsB("b2"))).where("b2 is not null").withColumnRenamed("b1", "b1_2")
val j = dsB1.joinWith(dsB, dsB1("b1_2") === dsB("b1"), "inner")
j.show
+----------+--------+
| _1| _2|
+----------+--------+
|[One, Van]|[One, 1]|
+----------+--------+
Upvotes: 4