Reputation: 2718
I have two dataframes. I want to replace values in col1
of df1
where values are null using the values from col1
of df2
. Please keep in mind df1
can have > 10^6 rows similarly to df2
and that df1
have some additional columns which are different from some addtional columns of df2
.
I know how to do join but I do not know how to do some kind of conditional join here in Spark with Scala.
df1
name | col1 | col2 | col3
----------------------------
foo | 0.1 | ...
bar | null |
hello | 0.6 |
foobar | null |
df2
name | col1 | col7
--------------------
lorem | 0.1 |
bar | 0.52 |
foobar | 0.47 |
EDIT:
This is my current solution:
df1.select("name", "col2", "col3").join(df2, (df1("name") === df2("name")), "left").select(df1("name"), col("col1"))
EDIT2:
val df1 = Seq(
("foo", Seq(0.1), 10, "a"),
("bar", Seq(), 20, "b"),
("hello", Seq(0.1), 30, "c"),
("foobar", Seq(), 40, "d")
).toDF("name", "col1", "col2", "col3")
val df2 = Seq(
("lorem", Seq(0.1), "x"),
("bar", Seq(0.52), "y"),
("foobar", Seq(0.47), "z")
).toDF("name", "col1", "col7")
display(df1.
join(df2, Seq("name"), "left_outer").
select(df1("name"), coalesce(df1("col1"), df2("col1")).as("col1")))
returns:
name | col1
bar | []
foo | [0.1]
foobar | []
hello | [0.1]
Upvotes: 0
Views: 946
Reputation: 22439
Consider using coalesce
on col1
after performing the left join
. To handle both null
s and empty array
s (in the case of ArrayType) as per revised requirement in the comments section, a when/otherwise
clause is used, as shown below:
val df1 = Seq(
("foo", Some(Seq(0.1)), 10, "a"),
("bar", None, 20, "b"),
("hello", Some(Seq(0.1)), 30, "c"),
("foobar", Some(Seq()), 40, "d")
).toDF("name", "col1", "col2", "col3")
val df2 = Seq(
("lorem", Seq(0.1), "x"),
("bar", Seq(0.52), "y"),
("foobar", Seq(0.47), "z")
).toDF("name", "col1", "col7")
df1.
join(df2, Seq("name"), "left_outer").
select(
df1("name"),
coalesce(
when(lit(df1.schema("col1").dataType.typeName) === "array" && size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")),
df2("col1")
).as("col1")
).
show
/*
+------+------+
| name| col1|
+------+------+
| foo| [0.1]|
| bar|[0.52]|
| hello| [0.1]|
|foobar|[0.47]|
+------+------+
*/
UPDATE:
It appears that Spark, surprisingly, does not handle conditionA && conditionB
the way most other languages do -- even when conditionA
is false conditionB
will still be evaluated, and replacing &&
with nested when/otherwise
still would not resolve the issue. It might be due to limitations in how the internally translated case/when/else
SQL is executed.
As a result, the above when/otherwise
data-type check via array-specific function size()
fails when col1
is non-ArrayType. Given that, I would forgo the dynamic column type check and perform different queries based on whether col1
is ArrayType or not, assuming it's known upfront:
df1.
join(df2, Seq("name"), "left_outer").
select(
df1("name"),
coalesce(
when(size(df1("col1")) === 0, df2("col1")).otherwise(df1("col1")), // <-- if col1 is an array
// df1("col1"), // <-- if col1 is not an array
df2("col1")
).as("col1")
).
show
Upvotes: 2