Reputation: 27
I've read a bunch of threads, but I can't find what I'm looking for in Apache Spark (though I've found it in PySpark, which I cannot use). I'm pretty close with what I have, but I have a few questions.
I'm working off a DF that looks like the following
PULocationID | pickup_datetime | number_of_pickups | Borough | Zone |
---|---|---|---|---|
75 | 2019-01-19 02:13:00 | 5 | Brooklyn | Williamsburg |
255 | 2019-01-19 12:05:00 | 8 | Brooklyn | Williamsburg |
99 | 2019-01-20 12:05:00 | 3 | Brooklyn | DUMBO |
102 | 2019-01-01 02:05:00 | 1 | Brooklyn | DUBMO |
10 | 2019-01-07 11:05:00 | 13 | Brooklyn | Park Slope |
75 | 2019-01-01 11:05:00 | 2 | Brooklyn | Williamsburg |
12 | 2019-01-11 01:05:00 | 1 | Brooklyn | Park Slope |
98 | 2019-01-28 01:05:00 | 8 | Brooklyn | DUMBO |
75 | 2019-01-10 00:05:00 | 8 | Brooklyn | Williamsburg |
255 | 2019-01-11 12:05:00 | 12 | Brooklyn | DUMBO |
I need to pull the zone with the highest number of pickups by hour of day. Hour of Day needs to be an integer, zone a string, and max_count integer.
hour_of_day | zone | max_count |
---|---|---|
0 | Williamsburg | 8 |
1 | DUMBO | 8 |
2 | Williamsburg | 5 |
11 | Park Slope | 13 |
12 | DUMBO | 15 |
Here's what I had:
val groupByWindow = BK_joined.groupBy(window(col("pickup_datetime"), "1 hour").as("hour_of_day"))
.agg(max("number_of_pickups").as("max_count")).select("hour_of_day", "Zone", "max_count")
There are two problems with this:
Zone
' given input columns: [hour_of_day, max_count]"I tried making a table that was already grouped by zone and time, but I don't think that would give me the right solution...
val groupByWindow = BK_joined.groupBy("Zone", "pickup_datetime")
.agg(max("number_of_pickups").as("max_count")).select("pickup_datetime", "Zone", "max_count")
Even if this is right, I still can't figure out how to get the timestamp grouped by hour of day. I tried various versions of the below:
val windowed = groupByWindow.groupBy(window(col("pickup_datetime"), "1 hour").as("hour_of_day"))
Upvotes: 0
Views: 757
Reputation: 450
The trick is convert the string type to timestamp type and use SQL function to extract hour and then use Window spec with row_number(), finally filter row number 1.
Check the online code version @ https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/992546394267440/5846184720595634/latest.html
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.expressions.Window
val df = Seq(
(75, "2019-01-19 02:13:00", 5 , "Brooklyn", "Williamsburg"),
(255, "2019-01-19 12:05:00", 8 , "Brooklyn", "Williamsburg"),
(99, "2019-01-20 12:05:00", 3 , "Brooklyn", "DUMBO"),
(102, "2019-01-01 02:05:00", 1 , "Brooklyn", "DUBMO"),
(10, "2019-01-07 11:05:00", 13, "Brooklyn", "Park Slope"),
(75, "2019-01-01 11:05:00", 2 , "Brooklyn", "Williamsburg"),
(12, "2019-01-11 01:05:00", 1 , "Brooklyn", "Park Slope"),
(98, "2019-01-28 01:05:00", 8 , "Brooklyn", "DUMBO"),
(75, "2019-01-10 00:05:00", 8 , "Brooklyn", "Williamsburg"),
(255, "2019-01-11 12:05:00", 12, "Brooklyn", "DUMBO"),
).toDF("PULocationID", "pickup_datetime", "number_of_pickups", "Borough", "Zone")
df.show()
val df1 = df.
withColumn("pickup_datetime", F.to_timestamp(F.col("pickup_datetime"),"yyyy-MM-dd HH:mm:ss")).
withColumn("hour", F.hour(F.col("pickup_datetime")))
df1.show()
df1.printSchema()
val windowSpec = Window.partitionBy("hour").orderBy(F.desc("number_of_pickups"))
val df2 = df1.withColumn("rn", F.row_number.over(windowSpec))
df2.filter(F.col("rn") === 1).drop(F.col("rn")).select("hour", "Zone", "number_of_pickups").show()
Upvotes: 1