Reputation: 5541
I've got a Dataset<Row>
with the following structure:
{"name": "Ben",
"lastHolidayDestination": "Florida",
"holidays": [
{"destination": "Florida",
"year": 2020},
{"destination": "Lille",
"year": 2019}
]}
I want to add a new column lastHolidayYear
to the root of the Dataset using Spark SQL, populated by finding the holidays
element that joins onto lastHolidayDestination
(assume there will only ever be one). So the output Dataset would be:
{"name": "Ben",
"lastHolidayDestination": "Florida",
"lastHolidayYear": 2020,
"holidays": [
{"destination": "Florida",
"year": 2020},
{"destination": "Lille",
"year": 2019}
]}
I've been playing around with dataset.withColumn()
and when()
(using Java, but Scala/Python answers are fine) but I've got nowhere so far. I really don't want to use a UDF unless I have to. Any suggestions?
Upvotes: 2
Views: 2261
Reputation: 5078
Since Spark 3.0, you can first filter the array and then get the first element of the array with the following expression:
import org.apache.spark.sql.functions.{element_at, filter, col}
val extractElementExpr = element_at(filter(col("myArrayColumnName"), myCondition), 1)
Where "myArrayColumnName"
is the name of the column containing the array and myCondition
is the condition, which is a Column => Column
expression.
For your specific example, the code is:
import org.apache.spark.sql.functions.{col, element_at, filter}
import org.apache.spark.sql.Column
val isLastHoliday = (c: Column) => c.getField("destination") === col("lastHolidayDestination")
val getLastHoliday = element_at(filter(col("holidays"), isLastHoliday), 1)
val result = df.withColumn("lastHolidayYear", getLastHoliday.getField("year"))
With this code, if your input dataframe contains the following values:
+------+----------------------+--------------------------------+
|name |lastHolidayDestination|holidays |
+------+----------------------+--------------------------------+
|Ben |Florida |[[Florida, 2020], [Lille, 2019]]|
|Alice |Peru |[[Florida, 2020], [Lille, 2019]]|
|Robert|Lille |[[Florida, 2020], [Lille, 2019]]|
+------+----------------------+--------------------------------+
Output will be:
+------+----------------------+--------------------------------+---------------+
|name |lastHolidayDestination|holidays |lastHolidayYear|
+------+----------------------+--------------------------------+---------------+
|Ben |Florida |[[Florida, 2020], [Lille, 2019]]|2020 |
|Alice |Peru |[[Florida, 2020], [Lille, 2019]]|null |
|Robert|Lille |[[Florida, 2020], [Lille, 2019]]|2019 |
+------+----------------------+--------------------------------+---------------+
Upvotes: 2
Reputation: 3619
To simulate the join with array you can use flatten and filter combo:
val result = ds.withColumn("expl", explode(col("holidays")))
.filter("lastHolidayDestination = expl.destination")
.withColumn("lastHolidayYear", col("expl.year"))
.drop("expl")
Upvotes: 2