219CID
219CID

Reputation: 440

Scala/Spark join on a second key if the first key doesn't exist in one of the dataframes

I have two dataframes:

 RegionValues:
+-----------+----------+----------------------+
|marketplace|primary_id|values                |
+-----------+----------+----------------------+
|xyz        |0000000001|[cat, dog, cow]       |
|reg        |PRT0000001|[hippo, dragon, moose]|
|asz        |0000001333|[mouse, rhino, lion]  |
+-----------+----------+----------------------+

Marketplace:
 
+----------+-----------+----------+
|primary_id|marketplace|parent_id |
+----------+-----------+----------+
|0000000001|xyz        |PRT0000001|
|0000000002|wrt        |PRT0000001|
|PRT0000001|reg        |PRT0000001|
|PRT00MISS0|asz        |PRT00MISS0|
|000000000B|823        |PRT0000002|
+----------+-----------+----------+

when I join the dataframes together I want to join them based on the primary_id value but if the primary_id field is not present in the RegionValues dataframe, then I want to fallback to joining on parent_id === primary_id. So my desired output would be:

+----------+--------------+-----------+-------------------------------------+
|primary_id|marketplace   |parent_id  |values                               |
+----------+--------------+-----------+-------------------------------------+
|0000000001|...           |PRT0000001 |[cat, dog, cow]                      |
|0000000002|...           |PRT0000001 |[hippo, dragon, moose]               |
|PRT0000001|...           |PRT0000001 |[hippo, dragon, moose]               |
|PRT00MISS0|              |PRT00MISS0 |null                                 |
|0000001333|              |0000001333 |[mouse, rhino, lion]                 |
|000000000B|              |PRT0000002 |null                                 |
+----------+--------------+-----------+-------------------------------------+

note that 0000000001 maintained its original values but that 0000000002 took on its parent_id's values since its not present in RegionValues. Is it possible to accomplish this logic within a join statement? I am using Scala and Spark.

I have tried to use a join statement like this but this results in a null value for the 0000000002 values:

val parentIdJoinCondition = when(
    (regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull,
    marketplaceDf.col("parent_id") === regionValuesDf.col("primary_id")
  ).otherwise(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id"))

val joinedDf = regionDf.join(
    marketplaceDf,
    parentIdJoinCondition,
    "outer"
)

I think I could get my desired result by using 3 distinct joins but this seems unnecessary and harder to read.

Upvotes: 1

Views: 832

Answers (2)

Vincent Doba
Vincent Doba

Reputation: 5068

Creating custom conditions will result to Spark performing a cross-join, that is a very inefficient way to join. Moreover, there is no way for Spark to know that a column does not match before performing actual join, so your condition regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull will always return false.

So, as you correctly guessed, the best solution is to perform several joins. You can end with two joins. First join to determine if we should use primary_id or parent_id value for outer join, and the actual outer join. Then, you can merge primary_id, marketplace and parent_id and drop useless columns

So the code would be:

import org.apache.spark.sql.functions.{coalesce, col, when}

val joinedDf = marketplaceDf.join(regionDf.drop("marketPlace"), Seq("primary_id"), "left")
  .withColumn("join_key", when(col("values").isNotNull, col("primary_id")).otherwise(col("parent_id")))
  .drop("values")
  .join(
    regionDf
      .withColumnRenamed("primary_id", "join_key")
      .withColumnRenamed("marketplace", "region_marketplace"),
    Seq("join_key"),
    "outer"
  )
  .withColumn("primary_id", coalesce(col("primary_id"), col("join_key")))
  .withColumn("parent_id", coalesce(col("parent_id"), col("join_key")))
  .withColumn("marketplace", coalesce(col("marketplace"), col("region_marketplace")))
  .drop("join_key", "region_marketplace")

That gives you the following joinDf dataframe:

+----------+-----------+----------+----------------------+
|primary_id|marketplace|parent_id |values                |
+----------+-----------+----------+----------------------+
|0000000001|xyz        |PRT0000001|[cat, dog, cow]       |
|0000001333|asz        |0000001333|[mouse, rhino, lion]  |
|0000000002|wrt        |PRT0000001|[hippo, dragon, moose]|
|PRT0000001|reg        |PRT0000001|[hippo, dragon, moose]|
|000000000B|823        |PRT0000002|null                  |
|PRT00MISS0|asz        |PRT00MISS0|null                  |
+----------+-----------+----------+----------------------+

Upvotes: 2

Sanket9394
Sanket9394

Reputation: 2091

Shouldn't regionValuesDf.col("primary_id") =!= marketplaceDf.col("primary_id")) instead of regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull in your join statement help ?

Upvotes: 0

Related Questions