b-ryce
b-ryce

Reputation: 5828

Scala Spark use Window function to find max value

I have a data set that looks like this:

+------------------------|-----+
|               timestamp| zone|
+------------------------+-----+
|    2019-01-01 00:05:00 |    A|
|    2019-01-01 00:05:00 |    A|
|    2019-01-01 00:05:00 |    B|
|    2019-01-01 01:05:00 |    C|
|    2019-01-01 02:05:00 |    B|
|    2019-01-01 02:05:00 |    B|
+------------------------+-----+

For each hour I need to count which zone had the most rows and end up with a table that looks like this:

+-----|-----+-----+
| hour| zone| max |
+-----+-----+-----+
|    0|    A|    2|
|    1|    C|    1|
|    2|    B|    2|
+-----+-----+-----+

My instructions say that I need to use the Window function along with "group by" to find my max count.

I've tried a few things but I'm not sure if I'm close. Any help would be appreciated.

Upvotes: 6

Views: 8976

Answers (2)

Chema
Chema

Reputation: 2828

You can use Windowing functions and group by with dataframes.

In your case you could use rank() over(partition by) window function.

import org.apache.spark.sql.function._

// first group by hour and zone
    val df_group = data_tms.
      select(hour(col("timestamp")).as("hour"), col("zone"))
      .groupBy(col("hour"), col("zone"))
      .agg(count("zone").as("max"))
// second rank by hour order by max in descending order
    val df_rank = df_group.
      select(col("hour"),
        col("zone"),
        col("max"),
        rank().over(Window.partitionBy(col("hour")).orderBy(col("max").desc)).as("rank"))
// filter by col rank = 1
    df_rank
      .select(col("hour"), 
        col("zone"), 
        col("max"))
      .where(col("rank") === 1)
      .orderBy(col("hour"))
     .show()

/*
+----+----+---+
|hour|zone|max|
+----+----+---+
|   0|   A|  2|
|   1|   C|  1|
|   2|   B|  2|
+----+----+---+
*/

Upvotes: 6

Raphael Roth
Raphael Roth

Reputation: 27373

You can use 2 subsequent window-functions to get your result:

df
  .withColumn("hour",hour($"timestamp"))
  .withColumn("cnt",count("*").over(Window.partitionBy($"hour",$"zone")))
  .withColumn("rnb",row_number().over(Window.partitionBy($"hour").orderBy($"cnt".desc)))
  .where($"rnb"===1)
  .select($"hour",$"zone",$"cnt".as("max"))

Upvotes: 9

Related Questions