Reputation: 7208
Let's say I have a DataFrame with three columns:
itemid, date, price
1, 2017-05-18, $1.10
2, 2017-05-18, $2.20
1, 2017-04-12, $0.90
1, 2017-03-29, $1.00
Now, I want to group by itemid, get the earliest date, and get the price that matches the earliest date. (We can assume (itemid, date) is unique.)
The output for the input above would be:
1, 2017-03-29, $1.00
2, 2017-05-18, $2.20
In SQL, I can do this using a self-join -- first select the minimum date for each itemid, and then select the price and date where the date matches that minimum date.
How do I express this in Scala Spark DataFrames? If the answer still involves the self-join, is the DataFrame query executor in Spark 1.6 smart enough to not actually materialize the join?
Upvotes: 0
Views: 246
Reputation: 22449
One approach would be to use SparkSQL window function similar to the following:
import org.apache.spark.sql.expressions.Window
val df = Seq(
(1, "2017-05-18", 1.10),
(2, "2017-05-18", 2.20),
(1, "2017-04-12", 0.90),
(1, "2017-03-29", 1.00)
).toDF(
"itemid", "date", "price"
).as[(Integer, String, Double)]
// Add earliest date by itemid via window function and
// keep only rows with earliest date by itemid
val df2 = df.withColumn("earliestDate", min("date").over(
Window.partitionBy("itemid")
)).
where($"date" === $"earliestDate")
df2.show
+------+----------+-----+------------+
|itemid| date|price|earliestDate|
+------+----------+-----+------------+
| 1|2017-03-29| 1.0| 2017-03-29|
| 2|2017-05-18| 2.2| 2017-05-18|
+------+----------+-----+------------+
Upvotes: 1