J Doe
J Doe

Reputation: 13

Pyspark: get count of rows between a time window

I have some data in this format:

user_id | date         | app_opened
123     | 2018-09-01   | 1
123     | 2018-09-01   | 1
123     | 2018-09-01   | 1
234     | 2018-08-23   | 1
234     | 2018-08-23   | 1
234     | 2018-08-21   | 1
234     | 2018-08-10   | 1

I am trying to get the count of the app opened in one day and also the count of app opened over the last week from the current day.

This is my required output:

user_id | date         | app_opened | app_open_day | app_open_week
123     | 2018-09-01   | 1          | 1            | 1
123     | 2018-09-01   | 1          | 2            | 2
123     | 2018-09-01   | 1          | 3            | 3
234     | 2018-08-23   | 1          | 1            | 1
234     | 2018-08-23   | 1          | 2            | 2
234     | 2018-08-21   | 1          | 1            | 3
234     | 2018-08-10   | 1          | 1            | 1

I am using window function in pyspark to get the required output. I am successful in getting the app_open_day count but I am not getting the correct app_open_week count.

Here is my query:

# For app_open_day
w1 = Window.partitionBy('user_id','date','app_opened').orderBy('date').rowsBetween(Window.unboundedPreceding,0)
df = df.select(col("*"), F.sum('app_opened').over(w1).alias("app_open_day"))

# For app_open_week
days = lambda i: i * 86400
w2 = (Window.partitionBy('user_id','date','app_opened').orderBy(date).rangeBetween(-days(7), 0))
df = df.select(col("*"), F.sum('app_opened').over(w2).alias("app_open_week"))

I am not getting where I am wrong. Please help. TIA.

Upvotes: 1

Views: 5385

Answers (1)

Ali Yesilli
Ali Yesilli

Reputation: 2200

You can find solution for app_open_week field

>>> import pyspark.sql.functions as F
>>> from pyspark.sql.window import Window
>>> 
>>> df = sc.parallelize([
...     (123,'2018-09-01',1),
...     (123,'2018-09-01',1),
...     (123,'2018-09-01',1),
...     (234,'2018-08-23',1),
...     (234,'2018-08-23',1),
...     (234,'2018-08-21',1),
...     (234,'2018-08-10',1)
...     ]).toDF(['user_id','date','app_opened'])
>>> 
>>> window1 = Window.partitionBy('user_id')
>>> df = df.withColumn('max_date', F.max('date').over(window1))
>>> df = df.withColumn('date_diff', (F.datediff(F.to_date('max_date'),F.to_date('date'))/7).cast('integer'))
>>> 
>>> window2 = Window.partitionBy('user_id','date_diff').orderBy(F.desc('date'))
>>> df = df.withColumn('app_open_week', F.row_number().over(window2)).select('user_id','date','app_opened','app_open_week')
>>> 
>>> df.sort(["user_id", "date"], ascending=[1, 0]).show()
+-------+----------+----------+-------------+                                   
|user_id|      date|app_opened|app_open_week|
+-------+----------+----------+-------------+
|    123|2018-09-01|         1|            1|
|    123|2018-09-01|         1|            2|
|    123|2018-09-01|         1|            3|
|    234|2018-08-23|         1|            1|
|    234|2018-08-23|         1|            2|
|    234|2018-08-21|         1|            3|
|    234|2018-08-10|         1|            1|
+-------+----------+----------+-------------+

Upvotes: 2

Related Questions