Jeremy
Jeremy

Reputation: 1924

Upsert Two Dataframes in Scala

I have two data sources, both of which have opinions about the current state of the same set of entities. Either data source may contain the most current data, which may or may not be from the current date. For example:

val df1 = Seq((1, "green", "there", "2018-01-19"), (2, "yellow", "there", "2018-01-18"), (4, "yellow", "here", "2018-01-20")).toDF("id", "status", "location", "date")

val df2 = Seq((2, "red", "here", "2018-01-20"), (3, "green", "there", "2018-01-20"), (4, "green", "here", "2018-01-19")).toDF("id", "status", "location", "date")

df1.show
+---+------+--------+----------+
| id|status|location|      date|
+---+------+--------+----------+
|  1| green|   there|2018-01-19|
|  2|yellow|   there|2018-01-18|
|  4|yellow|    here|2018-01-20|
+---+------+--------+----------+

df2.show
+---+------+--------+----------+
| id|status|location|      date|
+---+------+--------+----------+
|  2|   red|    here|2018-01-20|
|  3| green|   there|2018-01-20|
|  4| green|    here|2018-01-19|
+---+------+--------+----------+

I want the output to be the set of most current states for each entity:

+---+------+--------+----------+
| id|status|location|      date|
+---+------+--------+----------+
|  1| green|   there|2018-01-19|
|  2|   red|    here|2018-01-20|
|  3| green|   there|2018-01-20|
|  4|yellow|    here|2018-01-20|
+---+------+--------+----------+

My approach, which seems to work, is to join the two tables and then do a kind of custom coalesce operation based on date:

val joined = df1.join(df2, df1("id") === df2("id"), "outer")
+----+------+--------+----------+----+------+--------+----------+
|  id|status|location|      date|  id|status|location|      date|
+----+------+--------+----------+----+------+--------+----------+
|   1| green|   there|2018-01-19|null|  null|    null|      null| 
|null|  null|    null|      null|   3| green|   there|2018-01-20| 
|   4|yellow|    here|2018-01-20|   4|yellow|    here|2018-01-20|
|   2|yellow|   there|2018-01-18|   2|   red|    here|2018-01-20|
+----+------+--------+----------+----+------+--------+----------+

val weirdCoal(name: String) = when(df1("date") > df2("date") || df2("date").isNull, df1(name)).otherwise(df2(name)) as name

val ouput = joined.select(df1.columns.map(weirdCoal):_*)
+---+------+--------+----------+
| id|status|location|      date|
+---+------+--------+----------+
|  1| green|   there|2018-01-19|
|  2|   red|    here|2018-01-20|
|  3| green|   there|2018-01-20|
|  4|yellow|    here|2018-01-20|
+---+------+--------+----------+

Which is the output I expect.

I can also see doing this via some kind of union / aggregation approach or with a window that partitions by id and sorts by date and takes the last row.

My question: is there an idiomatic way of doing this?

Upvotes: 1

Views: 622

Answers (1)

vdep
vdep

Reputation: 3590

Yes it can be done without join using Window functions:

df1.union(df2)
  .withColumn("rank", rank().over(Window.partitionBy($"id").orderBy($"date".desc)))
  .filter($"rank" === 1)
  .drop($"rank")
  .orderBy($"id")
  .show

output:

+---+------+--------+----------+
| id|status|location|      date|
+---+------+--------+----------+
|  1| green|   there|2018-01-19|
|  2|   red|    here|2018-01-20|
|  3| green|   there|2018-01-20|
|  4|yellow|    here|2018-01-20|
+---+------+--------+----------+

the above code partitions the data by id and finds the top date among all dates falling under same id.

Upvotes: 2

Related Questions