lenpyspanacb
lenpyspanacb

Reputation: 333

Complex Filtering Operations in PySpark

Currently I'm performing calculations on a database that contains information on how loans are paid by borrowers. It is a huge dataset so I'm using PySpark and have just faced with an issue of how to use advanced filtering operations.

My dataframe looks like this:

Name    ID     ContractDate LoanSum ClosingDate
A       ID1    2022-10-10   10      2022-10-15 
A       ID1    2022-10-15   15      null
A       ID1    2022-10-30   20      2022-11-10
B       ID2    2022-11-11   15      2022-10-14
B       ID2    2022-12-10   30      null
B       ID2    2022-12-12   35      2022-12-14
C       ID3    2022-12-19   19      2022-11-10
D       ID4    2022-12-10   10      null
D       ID4    2022-12-12   40      2022-11-29

My goal is to create a dataframe that contains all loans issued to specific borrowers (group by unique ID) where the the first loan is not yet closed, but the second is already given to a borrower and the difference between loansums is less or equal then 5.

In other words, I have to obtain the following table (expected result):

Name    ID     ContractDate LoanSum Status
A       ID1    2022-10-15   15      null 
A       ID1    2022-10-30   20      2022-11-10
B       ID3    2022-12-10   30      null
B       ID3    2022-12-12   35      2022-12-14

Thank you in advance

Upvotes: 0

Views: 111

Answers (1)

Madiha Khalid
Madiha Khalid

Reputation: 380

You can use PySpark Window functions PartitionBy Unique ID. To check if next loan is already closed you can use Lead Function. Similarly Lag get the previous record value based on partition.

Here in this example, I used Lead and Lag together to make sure that both criteria meet on both rows. This is useful to create a check column and output these rows based on the check.

Check this solution.

from datetime import date
from decimal import Decimal
from pyspark.sql import SparkSession,Window
from pyspark.sql.types import *
import pyspark.sql.functions as f


data = [
('A','ID1','2022-10-10',10,'2022-10-150'),
('A','ID1','2022-10-15',15,'null'),
('A','ID1','2022-10-30',20,'2022-11-10'),
('B','ID2','2022-11-11',15,'2022-10-14'),
('B','ID2','2022-12-10',30,'null'),
('B','ID2','2022-12-12',35,'2022-12-14'),
('C','ID3','2022-12-19',19,'2022-11-10'),
('D','ID4','2022-12-10',10,'null'),
('D','ID4','2022-12-12',40,'2022-11-29')]

cols = ['Name','ID','ContractDate','LoanSum','ClosingDate'] 
spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.createDataFrame(data, cols)

print('Input---->')
df.show()

next_record = f.lead(f.col('ClosingDate')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("next_ClosingDate")
prev_record = f.lag(f.col('ClosingDate')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("prev_ClosingDate")

next_loan_sum = f.lead(f.col('LoanSum')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("next_LoanSum")
prev_loan_sum = f.lag(f.col('LoanSum')).over(Window.partitionBy('ID').orderBy("ContractDate")).alias("prev_LoanSum")


print('Output---->')
df.withColumn(
    'check',
    f.when(
        ( (f.col('ClosingDate')=='null') & (next_record !='null')  ) |
        ( (f.col('ClosingDate')!='null') & (prev_record =='null')  ) , 1).otherwise(0)
).withColumn('looan_sum_check',  
             f.when(  ((next_loan_sum - f.col('LoanSum') ) <=5) | ((f.col('LoanSum') - prev_loan_sum ) <=5) , 1).otherwise(0)     
).filter('check=1 and looan_sum_check=1').drop('check','looan_sum_check').show()

Results Output


Input---->
+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
|   A|ID1|  2022-10-10|     10|2022-10-150|
|   A|ID1|  2022-10-15|     15|       null|
|   A|ID1|  2022-10-30|     20| 2022-11-10|
|   B|ID2|  2022-11-11|     15| 2022-10-14|
|   B|ID2|  2022-12-10|     30|       null|
|   B|ID2|  2022-12-12|     35| 2022-12-14|
|   C|ID3|  2022-12-19|     19| 2022-11-10|
|   D|ID4|  2022-12-10|     10|       null|
|   D|ID4|  2022-12-12|     40| 2022-11-29|
+----+---+------------+-------+-----------+

Output---->
+----+---+------------+-------+-----------+
|Name| ID|ContractDate|LoanSum|ClosingDate|
+----+---+------------+-------+-----------+
|   A|ID1|  2022-10-15|     15|       null|
|   A|ID1|  2022-10-30|     20| 2022-11-10|
|   B|ID2|  2022-12-10|     30|       null|
|   B|ID2|  2022-12-12|     35| 2022-12-14|
+----+---+------------+-------+-----------+

Upvotes: 1

Related Questions