Jon Watte
Jon Watte

Reputation: 7208

In Spark 1.6 / Scala, getting column value correlated with aggregate

Let's say I have a DataFrame with three columns:

itemid, date, price
1, 2017-05-18, $1.10
2, 2017-05-18, $2.20
1, 2017-04-12, $0.90
1, 2017-03-29, $1.00

Now, I want to group by itemid, get the earliest date, and get the price that matches the earliest date. (We can assume (itemid, date) is unique.)

The output for the input above would be:

1, 2017-03-29, $1.00
2, 2017-05-18, $2.20

In SQL, I can do this using a self-join -- first select the minimum date for each itemid, and then select the price and date where the date matches that minimum date.

How do I express this in Scala Spark DataFrames? If the answer still involves the self-join, is the DataFrame query executor in Spark 1.6 smart enough to not actually materialize the join?

Upvotes: 0

Views: 246

Answers (1)

Leo C
Leo C

Reputation: 22449

One approach would be to use SparkSQL window function similar to the following:

import org.apache.spark.sql.expressions.Window

val df = Seq(
    (1, "2017-05-18", 1.10),
    (2, "2017-05-18", 2.20),
    (1, "2017-04-12", 0.90),
    (1, "2017-03-29", 1.00)
  ).toDF(
    "itemid", "date", "price"
  ).as[(Integer, String, Double)]

// Add earliest date by itemid via window function and
// keep only rows with earliest date by itemid
val df2 = df.withColumn("earliestDate", min("date").over(
    Window.partitionBy("itemid")
  )).
  where($"date" === $"earliestDate")

df2.show
+------+----------+-----+------------+
|itemid|      date|price|earliestDate|
+------+----------+-----+------------+
|     1|2017-03-29|  1.0|  2017-03-29|
|     2|2017-05-18|  2.2|  2017-05-18|
+------+----------+-----+------------+

Upvotes: 1

Related Questions