Sandeep
Sandeep

Reputation: 80

Dataframe - groupby access previous to previous record in the groupedDataSet

I have a scenario, where I need to compute PreviousPolicyNo based on PolicyNo. The data first needs to be sorted on the basis of TransactionDate. Then, we will have to check that if such a PolicyTerm was available before or not, if so, take latest available PolicyNo (available prior to TransactionDate) from that record and make entry to PreviousPolicyNo. If such a PolicyTerm was not available in the past, then pick the latest PolicyNo available (so far) or mark the entry as null if no previous term exist earlier (first row).

For example,

enter image description here

To calculate, second last entry, I had to look for last value available so far (prior to 12-12-2014) for PolicyTerm - 2 and similarly, for last entry, I had to look for last record available (prior to 12-12-2014) with PolicyTerm 3.

I need help for Scala implementation, I implemented an UDF (where I collected these columns in a List[Struct]), but it is not working good with huge dataset.

Upvotes: 1

Views: 43

Answers (1)

Shaido
Shaido

Reputation: 28322

Window functions can be used since they allow for both partitioning and sorting. However, the two conditions would need separate window functions. Below w1 corresponds to the condition where the same PolicyTerm have occured before (hence partitioned by PolicyTerm), w2 on the other hand picks the latest PolicyNo regardless of the PolicyTerm.

val w1 = Window.partitionBy("PolicyTerm").orderBy("TransactionDate")
val w2 = Window.orderBy("TransactionDate")

val df2 = df.withColumn("LagGroupPolicyNo", lag($"PolicyNo", 1).over(w1))
  .withColumn("LagPolicyNo", lag($"PolicyNo", 1).over(w2))
  .withColumn("PreviousPolicyNo", coalesce($"LagGroupPolicyNo", $"LagPolicyNo"))
  .drop("LagGroupPolicyNo", "LagPolicyNo")

Upvotes: 1

Related Questions