Reputation: 1924
I have two data sources, both of which have opinions about the current state of the same set of entities. Either data source may contain the most current data, which may or may not be from the current date. For example:
val df1 = Seq((1, "green", "there", "2018-01-19"), (2, "yellow", "there", "2018-01-18"), (4, "yellow", "here", "2018-01-20")).toDF("id", "status", "location", "date")
val df2 = Seq((2, "red", "here", "2018-01-20"), (3, "green", "there", "2018-01-20"), (4, "green", "here", "2018-01-19")).toDF("id", "status", "location", "date")
df1.show
+---+------+--------+----------+
| id|status|location| date|
+---+------+--------+----------+
| 1| green| there|2018-01-19|
| 2|yellow| there|2018-01-18|
| 4|yellow| here|2018-01-20|
+---+------+--------+----------+
df2.show
+---+------+--------+----------+
| id|status|location| date|
+---+------+--------+----------+
| 2| red| here|2018-01-20|
| 3| green| there|2018-01-20|
| 4| green| here|2018-01-19|
+---+------+--------+----------+
I want the output to be the set of most current states for each entity:
+---+------+--------+----------+
| id|status|location| date|
+---+------+--------+----------+
| 1| green| there|2018-01-19|
| 2| red| here|2018-01-20|
| 3| green| there|2018-01-20|
| 4|yellow| here|2018-01-20|
+---+------+--------+----------+
My approach, which seems to work, is to join the two tables and then do a kind of custom coalesce operation based on date:
val joined = df1.join(df2, df1("id") === df2("id"), "outer")
+----+------+--------+----------+----+------+--------+----------+
| id|status|location| date| id|status|location| date|
+----+------+--------+----------+----+------+--------+----------+
| 1| green| there|2018-01-19|null| null| null| null|
|null| null| null| null| 3| green| there|2018-01-20|
| 4|yellow| here|2018-01-20| 4|yellow| here|2018-01-20|
| 2|yellow| there|2018-01-18| 2| red| here|2018-01-20|
+----+------+--------+----------+----+------+--------+----------+
val weirdCoal(name: String) = when(df1("date") > df2("date") || df2("date").isNull, df1(name)).otherwise(df2(name)) as name
val ouput = joined.select(df1.columns.map(weirdCoal):_*)
+---+------+--------+----------+
| id|status|location| date|
+---+------+--------+----------+
| 1| green| there|2018-01-19|
| 2| red| here|2018-01-20|
| 3| green| there|2018-01-20|
| 4|yellow| here|2018-01-20|
+---+------+--------+----------+
Which is the output I expect.
I can also see doing this via some kind of union / aggregation approach or with a window that partitions by id and sorts by date and takes the last row.
My question: is there an idiomatic way of doing this?
Upvotes: 1
Views: 622
Reputation: 3590
Yes it can be done without join using Window
functions:
df1.union(df2)
.withColumn("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
.filter($"rank" === 1)
.drop($"rank")
.orderBy($"id")
.show
output:
+---+------+--------+----------+
| id|status|location| date|
+---+------+--------+----------+
| 1| green| there|2018-01-19|
| 2| red| here|2018-01-20|
| 3| green| there|2018-01-20|
| 4|yellow| here|2018-01-20|
+---+------+--------+----------+
the above code partitions the data by id
and finds the top date
among all dates falling under same id
.
Upvotes: 2