akp
akp

Reputation: 53

How to do a count the number of previous occurence in Pyspark

I have the following pyspark dataframe.

cust_id apply_date
1 01-06-2014
1 01-07-2014
1 01-04-2018
1 01-07-2018
2 01-04-2015
2 01-05-2015
2 01-06-2015
2 01-09-2015

I want to calculate the number of applications the customer made in the previous 6 months excluding the current application.

So the output sholud be:

cust_id apply_date apps_prev_180_days
1 01-06-2014 0
1 01-07-2014 1
1 01-04-2018 0
1 01-07-2018 1
2 01-04-2015 0
2 01-05-2015 1
2 01-06-2015 2
2 01-09-2015 3

I have tried to create lag variable within a window function and count the apps, however that only considers the previous application and not all applications. Any pointers how to do this ?

Upvotes: 1

Views: 477

Answers (2)

Steven
Steven

Reputation: 15258

Try this :

from pyspark.sql import functions as F, Window as W

df.withColumn(
    "apps_prev_180_days",
    F.count("*").over(
        W.partitionBy("cust_id")
        .orderBy(F.unix_timestamp("apply_date"))  # Date as unix timestamp (seconds)
        .rangeBetween(-(180 * 24 * 3600), -1)  # 180 days in secondes
    ),
).show()

+-------+----------+------------------+
|cust_id|apply_date|apps_prev_180_days|
+-------+----------+------------------+
|      1|2014-06-01|                 0|
|      1|2014-07-01|                 1|
|      1|2018-04-01|                 0|
|      1|2018-07-01|                 1|
|      2|2015-04-01|                 0|
|      2|2015-05-01|                 1|
|      2|2015-06-01|                 2|
|      2|2015-09-01|                 3|
+-------+----------+------------------+

Upvotes: 1

Ric S
Ric S

Reputation: 9247

We can use a Window function and the method .rangeBetween (in seconds) to define custom time intervals

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# convert string type to actual date (if not done yet)
df = df.withColumn('apply_date', F.to_date('apply_date', 'dd-MM-yyyy'))

# number of days and seconds for time interval
days = 180
unix_seconds = days * 86400

# define window
w = Window\
  .partitionBy('cust_id')\
  .orderBy(F.col('apply_date').cast('timestamp').cast('long'))\
  .rangeBetween(-unix_seconds, -1)

df = df.withColumn('apps_prev_180_days', F.count('cust_id').over(w))

df.show()

+-------+----------+------------------+
|cust_id|apply_date|apps_prev_180_days|
+-------+----------+------------------+
|      1|2014-06-01|                 0|
|      1|2014-07-01|                 1|
|      1|2018-04-01|                 0|
|      1|2018-07-01|                 1|
|      2|2015-04-01|                 0|
|      2|2015-05-01|                 1|
|      2|2015-06-01|                 2|
|      2|2015-09-01|                 3|
+-------+----------+------------------+

In this case, .rangeBetween(-unix_seconds, -1) defines a time interval from 180 days ago to the previos second (-1); this allows us to exclude the current application from the count.

Upvotes: 2

Related Questions