Nakeuh
Nakeuh

Reputation: 1909

Spark Dataframe : Custom Function on Window

I have a DataFrame df with a column that contains Double values.

I have managed to compute a sliding average in the following way :

val myWindow =  Window.partitionBy("id")
  .orderBy(asc("timestamp"))
  .rowsBetween(-2,2)

val newDf = df.withColumn("slidingAvg",avg($"value").over(myWindow))

But now I want to improve a bit my computation. Instead of having this basic slidingAverage, I want to compute a 'squared sliding root average' (values closer to 0 should affect more my result than higher or lower values).

Here is my try for now :

val myWindow =  Window.partitionBy("id")
  .orderBy(asc("timestamp"))
  .rowsBetween(-2,2)

val newDf = df.withColumn("squaredSlidingRootAverage",avg(sqrt($"value")).over(myWindow) * avg(sqrt($"value")).over(myWindow))

This does the trick when I have only positive values but, with negative values it does not give me the expected result (because of the sqrt). I would like to have 'something' that does this calculation when there is only positive values :

avg(sqrt($"value")).over(myWindow) * avg(sqrt($"value")).over(myWindow)

This when there is only negative values :

- avg( - sqrt(abs($"value"))).over(myWindow) * avg( - sqrt(abs($"value"))).over(myWindow))

And a mix of both when positive and negative values are mixed in the window.

So the thing I am trying to implement is something that does this :

val myWindow =  Window.partitionBy("id")
  .orderBy(asc("timestamp"))
  .rowsBetween(-2,2)

val newDf = df.withColumn("squaredSlidingRootAverage",
           [AVG_SIGN] * avg([VALUE_SIGN] * sqrt(abs($"value"))).over(myWindow) * 
                        avg([VALUE_SIGN] * sqrt(abs($"value"))).over(myWindow))

How can I implement this ?

Input Sample :

+---+----------+------+
| id| timestamp| value|
+---+----------+------+
|  0|         0|     0|
|  0|         1|     0|
|  0|         2|     0|
|  0|         3|     0|
|  0|         4|    25|
|  0|         5|  -100|
|  0|         6|   -25|
|  0|         7|     9|
|  0|         8|     0|
|  0|         9|     0|
|  0|        10|   -25|
|  0|        11|   100|
|  0|        12|     0|
|  0|        13|     0|
|  0|        14|     0|
|  0|        15|     0|

Output Sample :

+---+----------+------+
| id| timestamp| value|
+---+----------+------+
|  0|         0|     0|
|  0|         1|     0|
|  0|         2|     1|
|  0|         3|    -1|
|  0|         4|    -4|
|  0|         5| -1.96|
|  0|         6| -1.96|
|  0|         7| -5.76|
|  0|         8| -1.96|
|  0|         9|  2.56|
|  0|        10|     1|
|  0|        11|     1|
|  0|        12|     1|
|  0|        13|     4|
|  0|        14|     0|
|  0|        15|     0|

Upvotes: 1

Views: 1036

Answers (1)

Oli
Oli

Reputation: 10406

You can use the when and otherwise functions to handle your two different cases:

df
    .withColumn("sqrt", when('value <0, -sqrt(- 'value)).otherwise(sqrt('value)))
    .withColumn("avg", avg('sqrt) over myWindow)
    .withColumn("value", when('avg <0, -'avg * 'avg).otherwise('avg * 'avg))
    .select("id", "timestamp", "value")

Upvotes: 1

Related Questions