Amit
Amit

Reputation: 91

Calculate value based on value from same column of the previous row in spark

I have an issue where I have to calculate a column using a formula that uses the value from the calculation done in the previous row.

I am unable to figure it out using withColumn API.

I need to calculate a new column, using the formula:

MovingRate = MonthlyRate + (0.7 * MovingRatePrevious)

... where the MovingRatePrevious is the MovingRate of the prior row.

For month 1, I have the value so I do not need to re-calculate that but I need that value to be able to calculate the subsequent rows. I need to partition by Type.

This is my original dataset:

enter image description here

Desired results in MovingRate column:

enter image description here

Upvotes: 3

Views: 1693

Answers (3)

Raphael Roth
Raphael Roth

Reputation: 27383

Altough its possible to do with Widow Functions (See @Leo C's answer), I bet its more performant to aggregate once per Type using a groupBy. Then, explode the results of the UDF to get all rows back:

val df = Seq(
  (1, "blue", 0.4, Some(0.33)),
  (2, "blue", 0.3, None),
  (3, "blue", 0.7, None),
  (4, "blue", 0.9, None)
)
.toDF("Month", "Type", "MonthlyRate", "MovingRate")

// this udf produces an Seq of Tuple3 (Month, MonthlyRate, MovingRate)
val calcMovingRate = udf((startRate:Double,rates:Seq[Row]) => rates.tail
  .scanLeft((rates.head.getInt(0),startRate,startRate))((acc,curr) => (curr.getInt(0),curr.getDouble(1),acc._3+0.7*curr.getDouble(1)))
)

df
  .groupBy($"Type")
  .agg(
    first($"MovingRate",ignoreNulls=true).as("startRate"),
    collect_list(struct($"Month",$"MonthlyRate")).as("rates")
  )
  .select($"Type",explode(calcMovingRate($"startRate",$"rates")).as("movingRates"))
  .select($"Type",$"movingRates._1".as("Month"),$"movingRates._2".as("MonthlyRate"),$"movingRates._3".as("MovingRate"))
  .show()

gives:

+----+-----+-----------+------------------+
|Type|Month|MonthlyRate|        MovingRate|
+----+-----+-----------+------------------+
|blue|    1|       0.33|              0.33|
|blue|    2|        0.3|              0.54|
|blue|    3|        0.7|              1.03|
|blue|    4|        0.9|1.6600000000000001|
+----+-----+-----------+------------------+

Upvotes: 1

Leo C
Leo C

Reputation: 22449

Given the nature of the requirement that each moving rate is recursively computed from the previous rate, the column-oriented DataFrame API won't shine especially if the dataset is huge.

That said, if the dataset isn't large, one approach would be to make Spark recalculate the moving rates row-wise via a UDF, with a Window-partitioned rate list as its input:

import org.apache.spark.sql.expressions.Window

val df = Seq(
  (1, "blue", 0.4, Some(0.33)),
  (2, "blue", 0.3, None),
  (3, "blue", 0.7, None),
  (4, "blue", 0.9, None),
  (1, "red", 0.5, Some(0.2)),
  (2, "red", 0.6, None),
  (3, "red", 0.8, None)
).toDF("Month", "Type", "MonthlyRate", "MovingRate")

val win = Window.partitionBy("Type").orderBy("Month").
  rowsBetween(Window.unboundedPreceding, 0)

def movingRate(factor: Double) = udf( (initRate: Double, monthlyRates: Seq[Double]) =>
  monthlyRates.tail.foldLeft(initRate)( _ * factor + _ )
)

df.
  withColumn("MovingRate", when($"Month" === 1, $"MovingRate").otherwise(
    movingRate(0.7)(last($"MovingRate", ignoreNulls=true).over(win), collect_list($"MonthlyRate").over(win))
  )).
  show
// +-----+----+-----------+------------------+
// |Month|Type|MonthlyRate|        MovingRate|
// +-----+----+-----------+------------------+
// |    1| red|        0.5|               0.2|
// |    2| red|        0.6|              0.74|
// |    3| red|        0.8|             1.318|
// |    1|blue|        0.4|              0.33|
// |    2|blue|        0.3|0.5309999999999999|
// |    3|blue|        0.7|1.0716999999999999|
// |    4|blue|        0.9|1.6501899999999998|
// +-----+----+-----------+------------------+

Upvotes: 1

Oli
Oli

Reputation: 10406

What you are trying to do is compute a recursive formula that looks like:

x[i] = y[i] + 0.7 * x[i-1]

where x[i] is your MovingRate at row i and y[i] your MonthlyRate at row i.

The problem is that this is a purely sequential formula. Each row needs the result of the previous one which in turn needs the result of the one before. Spark is a parallel computation engine and it is going to be hard to use it to speed up a calculation that cannot really be parallelized.

Upvotes: 0

Related Questions