Peter Kažmirský
Peter Kažmirský

Reputation: 107

Apache spark aggregation: aggregate column based on another column value

I am not sure if I am asking this correctly and maybe that is the reason why I didn't find the correct answer so far. Anyway, if it will be duplicate I will delete this question.

I have following data:

id | last_updated | count
__________________________
1  | 20190101     | 3
1  | 20190201     | 2
1  | 20190301     | 1 

I want to group by this data by "id" column, get max value from "last_updated" and regarding "count" column I want keep value from row where "last_updated" has max value. So in that case result should be like that:

id | last_updated | count
__________________________
1  | 20190301     | 1 

So I imagine it will look like that:

df
  .groupBy("id")
  .agg(max("last_updated"), ... ("count"))

Is there any function I can use to get "count" based on "last_updated" column.

I am using spark 2.4.0.

Thanks for any help

Upvotes: 3

Views: 2521

Answers (1)

SCouto
SCouto

Reputation: 7928

You have two options, the first the better as for my understanding

OPTION 1 Perform a window function over the ID, create a column with the max value over that window function. Then select where the desired column equals the max value and finally drop the column and rename the max column as desired

val w  = Window.partitionBy("id")

df.withColumn("max", max("last_updated").over(w))
  .where("max = last_updated")
  .drop("last_updated")
  .withColumnRenamed("max", "last_updated")

OPTION 2

You can perform a join with the original dataframe after grouping

df.groupBy("id")
.agg(max("last_updated").as("last_updated"))
.join(df, Seq("id", "last_updated"))

QUICK EXAMPLE

INPUT

df.show
+---+------------+-----+
| id|last_updated|count|
+---+------------+-----+
|  1|    20190101|    3|
|  1|    20190201|    2|
|  1|    20190301|    1|
+---+------------+-----+

OUTPUT Option 1

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions

val w  = Window.partitionBy("id") 

df.withColumn("max", max("last_updated").over(w))
  .where("max = last_updated")
  .drop("last_updated")
  .withColumnRenamed("max", "last_updated")


+---+-----+------------+
| id|count|last_updated|
+---+-----+------------+
|  1|    1|    20190301|
+---+-----+------------+

Option 2

  df.groupBy("id")
      .agg(max("last_updated").as("last_updated")
      .join(df, Seq("id", "last_updated")).show


    +---+-----------------+----------+
    | id|     last_updated|    count |
    +---+-----------------+----------+
    |  1|         20190301|         1|
    +---+-----------------+----------+

Upvotes: 2

Related Questions