Reputation: 80
I have a scenario, where I need to compute PreviousPolicyNo
based on PolicyNo
. The data first needs to be sorted on the basis of TransactionDate
. Then, we will have to check that if such a PolicyTerm
was available before or not, if so, take latest available PolicyNo
(available prior to TransactionDate
) from that record and make entry to PreviousPolicyNo
. If such a PolicyTerm
was not available in the past, then pick the latest PolicyNo
available (so far) or mark the entry as null if no previous term exist earlier (first row).
For example,
To calculate, second last entry, I had to look for last value available so far (prior to 12-12-2014) for PolicyTerm
- 2 and similarly, for last entry, I had to look for last record available (prior to 12-12-2014) with PolicyTerm
3.
I need help for Scala implementation, I implemented an UDF
(where I collected these columns in a List[Struct]
), but it is not working good with huge dataset.
Upvotes: 1
Views: 43
Reputation: 28322
Window functions can be used since they allow for both partitioning and sorting. However, the two conditions would need separate window functions. Below w1
corresponds to the condition where the same PolicyTerm
have occured before (hence partitioned by PolicyTerm
), w2
on the other hand picks the latest PolicyNo
regardless of the PolicyTerm
.
val w1 = Window.partitionBy("PolicyTerm").orderBy("TransactionDate")
val w2 = Window.orderBy("TransactionDate")
val df2 = df.withColumn("LagGroupPolicyNo", lag($"PolicyNo", 1).over(w1))
.withColumn("LagPolicyNo", lag($"PolicyNo", 1).over(w2))
.withColumn("PreviousPolicyNo", coalesce($"LagGroupPolicyNo", $"LagPolicyNo"))
.drop("LagGroupPolicyNo", "LagPolicyNo")
Upvotes: 1