Pooya
Pooya

Reputation: 304

Spark Window function using more than one column

I have this dataframe that shows the send time and the open time for each user:

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
             ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
             ("user2", "2018-04-05 18:00:00", null),
             ("user2", "2018-04-05 19:00:00", null)              
            ).toDF("id", "sendTime", "openTime")

+-----+-------------------+-------------------+
|   id|           sendTime|           openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00|               null|
|user2|2018-04-05 19:00:00|               null|
+-----+-------------------+-------------------+

Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime column to the openTime column. The result dataframe should look like this:

+-----+-------------------+-------------------+-----+
|   id|           sendTime|           openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|    2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|    2|
|user2|2018-04-05 18:00:00|               null|    3|
|user2|2018-04-05 19:00:00|               null|    2|
+-----+-------------------+-------------------+-----+

This is as far as I have got but doesn't give me what I need:

var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))

Upvotes: 1

Views: 1221

Answers (2)

deo
deo

Reputation: 936

Here you go.

val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))
    
val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show

Upvotes: 1

randal25
randal25

Reputation: 1330

This seems quite difficult yo do with just using Window functions because you cannot reference the upper limit of sendTime when trying to derive whether the value from openTime is within the last 2 hours of the upper limit sendTime.

With spark 2.4 came higher order functions which you can read about here (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Using these you could collect all the openTime within a window using the collect_list function and then using the higher order function filter filter out the openTimes outside the two hours prior to the sendTime. Finally you can count the values remaining in the list to give you the count that you are after. Here is my code for doing this.

import org.apache.spark.sql.expressions.Window

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
             ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
             ("user2", "2018-04-05 18:00:00", null),
             ("user2", "2018-04-05 19:00:00", null)              
            ).toDF("id", "sendTime", "openTime")

var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
            .withColumn("openUnix", unix_timestamp($"openTime"))

val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

df3.show(false)

+-----+-------------------+-------------------+----------+----------+------------------------------------+
|id   |sendTime           |openTime           |sendUnix  |openUnix  |opened                              |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 18:00:00|null               |1522947600|null      |[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 19:00:00|null               |1522951200|null      |[1522946400, 1522947000]            |
+-----+-------------------+-------------------+----------+----------+------------------------------------+

val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
        "size(filter(opened, x -> x < sendUnix AND  x > sendUnix - 7200)) as count")

df4.show(false)

+-----+-------------------+-------------------+----------+----------+-----+
|id   |sendTime           |openTime           |sendUnix  |openUnix  |count|
+-----+-------------------+-------------------+----------+----------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2    |
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1    |
|user2|2018-04-05 18:00:00|null               |1522947600|null      |3    |
|user2|2018-04-05 19:00:00|null               |1522951200|null      |2    |
+-----+-------------------+-------------------+----------+----------+-----+

Upvotes: 2

Related Questions