lenpyspanacb
lenpyspanacb

Reputation: 333

Advanced filtering in PySpark

Currently I'm performing some calculations on a large database that contains various information on how loans are paid by various borrowers. From technical point of view, I'm using PySpark and have just faced with an issue of how to use advanced filtering operations.

For example my dataframe looks like this:

Name    ID     ContractDate LoanSum Status
Boris   ID3    2022-10-10   10      Closed 
Boris   ID3    2022-10-15   10      Active
Boris   ID3    2022-11-22   15      Active
John    ID1    2022-11-05   30      Active
Martin  ID6    2022-12-10   40      Closed
Martin  ID6    2022-12-12   40      Active
Martin  ID6    2022-07-11   40      Active

I have to create a dataframe that contains all loans issued by an organization to specific borrowers (group by ID) where the number of days between two loans (assigned to one unique ID) is less than 5 and the loansum is the same.

In other words, I have to obtain the following table:

Name    ID     ContractDate LoanSum Status
Boris   ID3    2022-10-10   10      Closed 
Boris   ID3    2022-10-15   10      Active
Martin  ID6    2022-12-10   40      Closed
Martin  ID6    2022-12-12   40      Active

What should I do in order to run this filtering?

Thank you in advance

Upvotes: 1

Views: 102

Answers (2)

Shubham Sharma
Shubham Sharma

Reputation: 71707

Code

# Create a window spec
w = Window.partitionBy('Name', 'ID', 'LoanSum').orderBy('ContractDate')

# Calculate forward(x) and backward(y) differences
x = F.datediff('ContractDate', F.lag('ContractDate').over(w))
y = F.datediff('ContractDate', F.lag('ContractDate', -1).over(w))

# Boolean condition to filter the rows where the number 
# of days between two loans is less than or equal to 5 days
mask = (F.abs(x) <= 5) | (F.abs(y) <= 5)

# Make sure to convert ContractDate to date type
result = df.withColumn('ContractDate', F.to_date('ContractDate'))

# filter the rows using the boolean mask
result = result.select('*', mask.alias('mask')).filter('mask').drop('mask')

Result

+------+---+------------+-------+------+
|  Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3|  2022-10-10|     10|Closed|
| Boris|ID3|  2022-10-15|     10|Active|
|Martin|ID6|  2022-12-10|     40|Closed|
|Martin|ID6|  2022-12-12|     40|Active|
+------+---+------------+-------+------+

Upvotes: 1

Arud Seka Berne S
Arud Seka Berne S

Reputation: 953

Your DataFrame (df):

+------+---+------------+-------+------+
|  Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3|  2022-10-10|     10|Closed|
| Boris|ID3|  2022-10-15|     10|Active|
| Boris|ID3|  2022-11-22|     15|Active|
|  John|ID1|  2022-11-05|     30|Active|
|Martin|ID6|  2022-12-10|     40|Closed|
|Martin|ID6|  2022-12-12|     40|Active|
|Martin|ID6|  2022-07-11|     40|Active|
+------+---+------------+-------+------+

Importing necessary packages:

from pyspark.sql.window import Window
from pyspark.sql.functions import datediff, lag, array, col, explode

Try this

win_spec = Window.partitionBy("ID").orderBy("ID", "ContractDate")

df_2 = df_1 \
    .withColumn("PrevDate", lag("ContractDate",1,0).over(win_spec)) \
    .withColumn("DateGap", datediff("ContractDate","PrevDate"))

df_3 = df_2.filter("DateGap <= 5") \
    .withColumn("ContractDate", explode(array(col("ContractDate"), col("PrevDate")))) \
    .select("Name","ID","ContractDate")

df_final = df_3 \
    .join(df_2, ["Name","ID","ContractDate"]) \
    .drop("PrevDate", "DateGap")

df_final.show()

Output

+------+---+------------+-------+------+
|  Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3|  2022-10-10|     10|Closed|
| Boris|ID3|  2022-10-15|     10|Active|
|Martin|ID6|  2022-12-10|     40|Closed|
|Martin|ID6|  2022-12-12|     40|Active|
+------+---+------------+-------+------+

Upvotes: 1

Related Questions