Reputation: 333
Currently I'm performing some calculations on a large database that contains various information on how loans are paid by various borrowers. From technical point of view, I'm using PySpark and have just faced with an issue of how to use advanced filtering operations.
For example my dataframe looks like this:
Name ID ContractDate LoanSum Status
Boris ID3 2022-10-10 10 Closed
Boris ID3 2022-10-15 10 Active
Boris ID3 2022-11-22 15 Active
John ID1 2022-11-05 30 Active
Martin ID6 2022-12-10 40 Closed
Martin ID6 2022-12-12 40 Active
Martin ID6 2022-07-11 40 Active
I have to create a dataframe that contains all loans issued by an organization to specific borrowers (group by ID) where the number of days between two loans (assigned to one unique ID) is less than 5 and the loansum is the same.
In other words, I have to obtain the following table:
Name ID ContractDate LoanSum Status
Boris ID3 2022-10-10 10 Closed
Boris ID3 2022-10-15 10 Active
Martin ID6 2022-12-10 40 Closed
Martin ID6 2022-12-12 40 Active
What should I do in order to run this filtering?
Thank you in advance
Upvotes: 1
Views: 102
Reputation: 71707
# Create a window spec
w = Window.partitionBy('Name', 'ID', 'LoanSum').orderBy('ContractDate')
# Calculate forward(x) and backward(y) differences
x = F.datediff('ContractDate', F.lag('ContractDate').over(w))
y = F.datediff('ContractDate', F.lag('ContractDate', -1).over(w))
# Boolean condition to filter the rows where the number
# of days between two loans is less than or equal to 5 days
mask = (F.abs(x) <= 5) | (F.abs(y) <= 5)
# Make sure to convert ContractDate to date type
result = df.withColumn('ContractDate', F.to_date('ContractDate'))
# filter the rows using the boolean mask
result = result.select('*', mask.alias('mask')).filter('mask').drop('mask')
+------+---+------------+-------+------+
| Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3| 2022-10-10| 10|Closed|
| Boris|ID3| 2022-10-15| 10|Active|
|Martin|ID6| 2022-12-10| 40|Closed|
|Martin|ID6| 2022-12-12| 40|Active|
+------+---+------------+-------+------+
Upvotes: 1
Reputation: 953
Your DataFrame (df):
+------+---+------------+-------+------+
| Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3| 2022-10-10| 10|Closed|
| Boris|ID3| 2022-10-15| 10|Active|
| Boris|ID3| 2022-11-22| 15|Active|
| John|ID1| 2022-11-05| 30|Active|
|Martin|ID6| 2022-12-10| 40|Closed|
|Martin|ID6| 2022-12-12| 40|Active|
|Martin|ID6| 2022-07-11| 40|Active|
+------+---+------------+-------+------+
Importing necessary packages:
from pyspark.sql.window import Window
from pyspark.sql.functions import datediff, lag, array, col, explode
Try this
win_spec = Window.partitionBy("ID").orderBy("ID", "ContractDate")
df_2 = df_1 \
.withColumn("PrevDate", lag("ContractDate",1,0).over(win_spec)) \
.withColumn("DateGap", datediff("ContractDate","PrevDate"))
df_3 = df_2.filter("DateGap <= 5") \
.withColumn("ContractDate", explode(array(col("ContractDate"), col("PrevDate")))) \
.select("Name","ID","ContractDate")
df_final = df_3 \
.join(df_2, ["Name","ID","ContractDate"]) \
.drop("PrevDate", "DateGap")
df_final.show()
Output
+------+---+------------+-------+------+
| Name| ID|ContractDate|LoanSum|Status|
+------+---+------------+-------+------+
| Boris|ID3| 2022-10-10| 10|Closed|
| Boris|ID3| 2022-10-15| 10|Active|
|Martin|ID6| 2022-12-10| 40|Closed|
|Martin|ID6| 2022-12-12| 40|Active|
+------+---+------------+-------+------+
Upvotes: 1