Dariusz Krynicki
Dariusz Krynicki

Reputation: 2718

spark scala conditional join replace null values

I have two dataframes. I want to replace values in col1 of df1 where values are null using the values from col1 of df2. Please keep in mind df1 can have > 10^6 rows similarly to df2 and that df1 have some additional columns which are different from some addtional columns of df2.

I know how to do join but I do not know how to do some kind of conditional join here in Spark with Scala.

df1

name   | col1 | col2 | col3
----------------------------
foo    | 0.1  | ...
bar    | null |
hello  | 0.6  |
foobar | null |


df2 

name   | col1 | col7
--------------------
lorem  | 0.1  |
bar    | 0.52 |
foobar | 0.47 |

EDIT:

This is my current solution:

df1.select("name", "col2", "col3").join(df2, (df1("name") === df2("name")), "left").select(df1("name"), col("col1"))

EDIT2:

val df1 = Seq(
  ("foo", Seq(0.1), 10, "a"),
  ("bar", Seq(), 20, "b"),
  ("hello", Seq(0.1), 30, "c"),
  ("foobar", Seq(), 40, "d")
).toDF("name", "col1", "col2", "col3")

val df2 = Seq(
  ("lorem", Seq(0.1), "x"),
  ("bar", Seq(0.52), "y"),
  ("foobar", Seq(0.47), "z")
).toDF("name", "col1", "col7")

display(df1.
  join(df2, Seq("name"), "left_outer").
  select(df1("name"), coalesce(df1("col1"), df2("col1")).as("col1")))

returns:

name   | col1
bar    | []
foo    | [0.1]
foobar | []
hello  | [0.1]

Upvotes: 0

Views: 946

Answers (1)

Leo C
Leo C

Reputation: 22439

Consider using coalesce on col1 after performing the left join. To handle both nulls and empty arrays (in the case of ArrayType) as per revised requirement in the comments section, a when/otherwise clause is used, as shown below:

val df1 = Seq(
  ("foo",    Some(Seq(0.1)), 10, "a"),
  ("bar",    None,           20, "b"),
  ("hello",  Some(Seq(0.1)), 30, "c"),
  ("foobar", Some(Seq()),    40, "d")
).toDF("name", "col1", "col2", "col3")

val df2 = Seq(
  ("lorem",  Seq(0.1),  "x"),
  ("bar",    Seq(0.52), "y"),
  ("foobar", Seq(0.47), "z")
).toDF("name", "col1", "col7")

df1.
  join(df2, Seq("name"), "left_outer").
  select(
    df1("name"),
    coalesce(
      when(lit(df1.schema("col1").dataType.typeName) === "array" && size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")), 
      df2("col1")
    ).as("col1")
  ).
  show
/*
+------+------+
|  name|  col1|
+------+------+
|   foo| [0.1]|
|   bar|[0.52]|
| hello| [0.1]|
|foobar|[0.47]|
+------+------+
*/

UPDATE:

It appears that Spark, surprisingly, does not handle conditionA && conditionB the way most other languages do -- even when conditionA is false conditionB will still be evaluated, and replacing && with nested when/otherwise still would not resolve the issue. It might be due to limitations in how the internally translated case/when/else SQL is executed.

As a result, the above when/otherwise data-type check via array-specific function size() fails when col1 is non-ArrayType. Given that, I would forgo the dynamic column type check and perform different queries based on whether col1 is ArrayType or not, assuming it's known upfront:

df1.
  join(df2, Seq("name"), "left_outer").
  select(
    df1("name"),
    coalesce(
      when(size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")),  // <-- if col1 is an array
      // df1("col1"),  // <-- if col1 is not an array
      df2("col1")
    ).as("col1")
  ).
  show

Upvotes: 2

Related Questions