Reputation: 304
I have this dataframe that shows the send time and the open time for each user:
val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)
).toDF("id", "sendTime", "openTime")
+-----+-------------------+-------------------+
| id| sendTime| openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00| null|
|user2|2018-04-05 19:00:00| null|
+-----+-------------------+-------------------+
Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime
column to the openTime
column. The result dataframe should look like this:
+-----+-------------------+-------------------+-----+
| id| sendTime| openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00| 2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00| 0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00| 1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00| 2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00| 2|
|user2|2018-04-05 18:00:00| null| 3|
|user2|2018-04-05 19:00:00| null| 2|
+-----+-------------------+-------------------+-----+
This is as far as I have got but doesn't give me what I need:
var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))
Upvotes: 1
Views: 1221
Reputation: 936
Here you go.
val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))
val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)
var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))
var df3 = df2.select('*, explode('list).as("prevTimeStamp"))
df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show
Upvotes: 1
Reputation: 1330
This seems quite difficult yo do with just using Window
functions because you cannot reference the upper limit of sendTime
when trying to derive whether the value from openTime
is within the last 2 hours of the upper limit sendTime
.
With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime
within a window using the collect_list
function and then using the higher order function filter
filter out the openTimes
outside the two hours prior to the sendTime
. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.
import org.apache.spark.sql.expressions.Window
val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)
).toDF("id", "sendTime", "openTime")
var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
.withColumn("openUnix", unix_timestamp($"openTime"))
val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))
df3.show(false)
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|id |sendTime |openTime |sendUnix |openUnix |opened |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
"size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")
df4.show(false)
+-----+-------------------+-------------------+----------+----------+-----+
|id |sendTime |openTime |sendUnix |openUnix |count|
+-----+-------------------+-------------------+----------+----------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
|user2|2018-04-05 18:00:00|null |1522947600|null |3 |
|user2|2018-04-05 19:00:00|null |1522951200|null |2 |
+-----+-------------------+-------------------+----------+----------+-----+
Upvotes: 2