russellrd
russellrd

Reputation: 27

Group DF by hour of day

I've read a bunch of threads, but I can't find what I'm looking for in Apache Spark (though I've found it in PySpark, which I cannot use). I'm pretty close with what I have, but I have a few questions.

I'm working off a DF that looks like the following

PULocationID pickup_datetime number_of_pickups Borough Zone
75 2019-01-19 02:13:00 5 Brooklyn Williamsburg
255 2019-01-19 12:05:00 8 Brooklyn Williamsburg
99 2019-01-20 12:05:00 3 Brooklyn DUMBO
102 2019-01-01 02:05:00 1 Brooklyn DUBMO
10 2019-01-07 11:05:00 13 Brooklyn Park Slope
75 2019-01-01 11:05:00 2 Brooklyn Williamsburg
12 2019-01-11 01:05:00 1 Brooklyn Park Slope
98 2019-01-28 01:05:00 8 Brooklyn DUMBO
75 2019-01-10 00:05:00 8 Brooklyn Williamsburg
255 2019-01-11 12:05:00 12 Brooklyn DUMBO

I need to pull the zone with the highest number of pickups by hour of day. Hour of Day needs to be an integer, zone a string, and max_count integer.

hour_of_day zone max_count
0 Williamsburg 8
1 DUMBO 8
2 Williamsburg 5
11 Park Slope 13
12 DUMBO 15

Here's what I had:

val groupByWindow = BK_joined.groupBy(window(col("pickup_datetime"), "1 hour").as("hour_of_day"))
.agg(max("number_of_pickups").as("max_count")).select("hour_of_day", "Zone", "max_count")

There are two problems with this:

I tried making a table that was already grouped by zone and time, but I don't think that would give me the right solution...

val groupByWindow = BK_joined.groupBy("Zone", "pickup_datetime")
.agg(max("number_of_pickups").as("max_count")).select("pickup_datetime", "Zone", "max_count")

Even if this is right, I still can't figure out how to get the timestamp grouped by hour of day. I tried various versions of the below:

val windowed = groupByWindow.groupBy(window(col("pickup_datetime"), "1 hour").as("hour_of_day"))

Upvotes: 0

Views: 757

Answers (1)

Mageswaran
Mageswaran

Reputation: 450

The trick is convert the string type to timestamp type and use SQL function to extract hour and then use Window spec with row_number(), finally filter row number 1.

Check the online code version @ https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/992546394267440/5846184720595634/latest.html

import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.Window

val df = Seq(                                                                
(75,  "2019-01-19 02:13:00",  5 ,   "Brooklyn", "Williamsburg"),        
(255, "2019-01-19 12:05:00",  8 ,   "Brooklyn", "Williamsburg"),        
(99,  "2019-01-20 12:05:00",  3 ,   "Brooklyn", "DUMBO"),               
(102, "2019-01-01 02:05:00",  1 ,   "Brooklyn", "DUBMO"),               
(10,  "2019-01-07 11:05:00",  13,   "Brooklyn", "Park Slope"),          
(75,  "2019-01-01 11:05:00",  2 ,   "Brooklyn", "Williamsburg"),        
(12,  "2019-01-11 01:05:00",  1 ,   "Brooklyn", "Park Slope"),          
(98,  "2019-01-28 01:05:00",  8 ,   "Brooklyn", "DUMBO"),               
(75,  "2019-01-10 00:05:00",  8 ,   "Brooklyn", "Williamsburg"),        
(255, "2019-01-11 12:05:00",  12,   "Brooklyn", "DUMBO"),               
).toDF("PULocationID", "pickup_datetime", "number_of_pickups", "Borough", "Zone")
df.show()

val df1 = df.
            withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"),"yyyy-MM-dd HH:mm:ss")).
            withColumn("hour", F.hour(F.col("pickup_datetime")))
df1.show()
df1.printSchema()

val windowSpec = Window.partitionBy("hour").orderBy(F.desc("number_of_pickups"))
val df2 = df1.withColumn("rn", F.row_number.over(windowSpec))

df2.filter(F.col("rn") === 1).drop(F.col("rn")).select("hour", "Zone", "number_of_pickups").show()

Upvotes: 1

Related Questions