ana0017
ana0017

Reputation: 39

How do I find a max of a column for each unique value in another column in Spark?

Say I have a dataset similar to this: Data

My final product needs to be a row for each day of the Week with the Place that had the most Activities for that day. i.e. Mon Place A 56, Wed Place C 64, etc. I have tried using the Window function and am using max and groupie, but I am getting myself confused.

Upvotes: 1

Views: 1124

Answers (2)

werner
werner

Reputation: 14905

Spark 3.0 introduced the aggregation function max_by that does exactly what you are looking for:

df.groupBy("day")
   .agg(expr("max_by(place, number)"), max('number))
   .show()

Result:

+-----+---------------------+-----------+
|  day|max_by(place, number)|max(number)|
+-----+---------------------+-----------+
|  Mon|              Place B|         42|
|  Wed|              Place F|         54|
|  Fri|              Place E|         64|
|Thurs|              Place D|         45|
+-----+---------------------+-----------+

Upvotes: 1

Boris Azanov
Boris Azanov

Reputation: 4501

For your purposes you need to write window function:

val df = Seq(
  ("Mon", "Place A", 10),
  ("Mon", "Place B", 42),
  ("Wed", "Place C", 41),
  ("Thurs", "Place D", 45),
  ("Fri", "Place E", 64),
  ("Fri", "Place A", 12),
  ("Wed", "Place F", 54),
  ("Wed", "Place A", 1)
).toDF("day", "place", "number")
df.show()
df.withColumn("orderedNumberForDay",
  row_number()
    .over(
      Window.orderBy(col("number").desc)
        .partitionBy("day")
    )
).filter(col("orderedNumberForDay") === lit(1))
 .select("day", "place", "number")
 .show()
/*                            
+-----+-------+------+        +-----+-------+------+
|  day|  place|number|        |  day|  place|number|
+-----+-------+------+        +-----+-------+------+
|  Mon|Place A|    10|        |  Mon|Place B|    42|
|  Mon|Place B|    42|  ===>> |  Wed|Place F|    54|
|  Wed|Place C|    41|        |  Fri|Place E|    64|
|Thurs|Place D|    45|        |Thurs|Place D|    45|
|  Fri|Place E|    64|        +-----+-------+------+
|  Fri|Place A|    12|   
|  Wed|Place F|    54|   
|  Wed|Place A|     1|   
+-----+-------+------+
*/

Just a little explanation how it works

Firstly you need to add column with window function result, here is:

df.withColumn("orderedNumberForDay",
  row_number()
    .over(
      Window.orderBy(col("number").desc)
      .partitionBy("day")
    )
)

row_number() - is counter of rows inside your partition. Partition is like group in group by. partitionBy("day") just grouping windows with same day column value. And finally we have to order this window by number in desc order, so there is orderBy(col("number").desc in our window function. over is like a bridge from windows to some useful computations inside windows and it's just bind row_number and window function.

After execution this stage we will have data:

+-----+-------+------+-------------------+
|  day|  place|number|orderedNumberForDay|
+-----+-------+------+-------------------+
|  Mon|Place B|    42|                  1|
|  Mon|Place A|    10|                  2|
|  Wed|Place F|    54|                  1|
|  Wed|Place C|    41|                  2|
|  Wed|Place A|     1|                  3|
|  Fri|Place E|    64|                  1|
|  Fri|Place A|    12|                  2|
|Thurs|Place D|    45|                  1|
+-----+-------+------+-------------------+

So, all we need is filter rows with orderedNumberForDay equals 1 - it will be with max number and select started columns: day, place, number. Final result will be:

+-----+-------+------+
|  day|  place|number|
+-----+-------+------+
|  Mon|Place B|    42|
|  Wed|Place F|    54|
|  Fri|Place E|    64|
|Thurs|Place D|    45|
+-----+-------+------+

Upvotes: 1

Related Questions