peakstatus
peakstatus

Reputation: 441

Spark - 30 Minute Generic Windowing

I'm currently working on a spark script to look at every 30 minute period and determine the average of a column over that 30 minute rolling period.

The format of my timestamp is of the form: MM/dd/yyyy HH:mm:ss AM/PM. Essentially, what I'm looking to do is look at every 30 minute period not including dates. (I.e the average passengers for all days between 1:02 pm and 1:32pm).

My current script will take my timestamp, convert it to a unix timestamp and store it as a new column. Then, looking at the current timestamp, it will subtract 900 seconds, and add 900 seconds to get the records from the previous 15 minutes and records 15 minutes after the current timestamp. This give me the 30 minute window I'm looking for. This works when I include the MM/dd/yyyy when creating my new column 'timestamp':

val taxiSub = spark.read.format("csv").option("header", true).option("inferSchema", true).load("/user/zeppelin/taxi/taxi_subset.csv")
taxiSub.createOrReplaceTempView("taxiSub")
val stamp = taxiSub.withColumn("timestamp", unix_timestamp($"tpep_pickup_datetime", "MM/dd/yyyy HH:mm"))
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("VendorID").orderBy("timestamp").rangeBetween(-900,900)
val answer = stamp.withColumn("AvgPassenger", avg(stamp("passenger_count")).over(windowSpec))
answer.select("VendorID", "tpep_pickup_datetime", "timestamp", "passenger_count", "AvgPassenger")
answer.createOrReplaceTempView("answerTable")
spark.sqlContext.sql("SELECT timestamp, AvgPassenger FROM answerTable ORDER BY AvgPassenger DESC limit 10").show()

However, this gives me specific dates included in my range instead of the generic time period mentioned above. When I try to drop the MM/dd/yyyy from my timestamp generation, all of my timestamp values become null. Additionally, how can I account for the AM/PM portions of my timestamp?

Any thoughts would be appreciated.

Upvotes: 2

Views: 413

Answers (1)

notNull
notNull

Reputation: 31510

We can use unix_timestamp("HH:mm","HH:mm") to get generic epoch time value then use that value in our orderBy clause.

Example:

//import org.apache.spark.sql.expressions._

//sample data
//+--------+---------+---------------+--------------------+
//|VendorID|timestamp|passenger_count|tpep_pickup_datetime|
//+--------+---------+---------------+--------------------+
//|       1|    66180|              3|    12/12/2019 12:23|
//|       1|    66780|              2|    12/13/2018 12:33|
//|       2|    66180|             12|    12/13/2019 12:23|
//|       2|    69780|             13|    12/13/2018 13:23|
//+--------+---------+---------------+--------------------+

val stamp = taxiSub.withColumn("tmp",to_timestamp(col("tpep_pickup_datetime"),"MM/dd/yyyy HH:mm")).//add new timestamp type field
withColumn("timestamp", unix_timestamp(concat_ws(":",hour(col("tmp")),minute(col("tmp"))),"HH:mm")). //extract hour,minute and convert to epoch timestamp value
drop("tmp")

//partition based on vendorid
val windowSpec = Window.partitionBy("VendorID").orderBy("timestamp").rangeBetween(-900,900)

stamp.withColumn("AvgPassenger", avg(stamp("passenger_count")).over(windowSpec)).show()

//+--------+---------+---------------+--------------------+------------+
//|VendorID|timestamp|passenger_count|tpep_pickup_datetime|AvgPassenger|
//+--------+---------+---------------+--------------------+------------+
//|       1|    66180|              3|    12/12/2019 12:23|         2.5|
//|       1|    66780|              2|    12/13/2018 12:33|         2.5|
//|       2|    66180|             12|    12/13/2019 12:23|        12.0|
//|       2|    69780|             13|    12/13/2018 13:23|        13.0|
//+--------+---------+---------------+--------------------+------------+

Upvotes: 1

Related Questions