Active
Active

Reputation: 45

Spark : Get max consecutive decrease in value

My requirement is to get the max count of decrease in values

Below is my input dataset:

+---+-------+
| id| amount|
+---+-------+
|  1|   10.0|
|  1|    9.0|
|  1|    7.0|
|  1|    6.0|
|  2|   50.0|
|  2|   60.0|
|  2|   70.0|
|  3|   90.0|
|  3|   80.0|
|  3|   90.0|
+---+-------+

and my required outcome is as below:

+---+--------+
| id| outcome|
+---+--------+
|  1|       3|
|  2|       0|
|  3|       2|
+---+--------+

My outcome (new column) is based on group by id and how many times the value has decreased consecutively for 3 times. For id 1 even though it reduced 4 times, I want only max of 3 times.

Any suggestion or help will be greatly appreciated in spark sql or spark dataframe(scala).

Upvotes: 2

Views: 328

Answers (2)

Oli
Oli

Reputation: 10406

You first need an ordering column to compute the decrease. In your example there is none so we can build an index column with monotonically_increasing_id. Then, we can use a window and the lag and lead functions to get what you want:

import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy("id").orderBy("index")

df
    .withColumn("index", monotonically_increasing_id)
    // there is a decrease if the amount is less than the next one
    // or greater than the previous one
    .withColumn("decrease", (lag('amount, 1).over(win) > 'amount) ||
                            (lead('amount, 1).over(win) < 'amount) 
    )
    .groupBy("id")
    // we need to cast the boolean to an int to sum them
    .agg(sum('decrease cast "int") as "outcome")
    // capping the outcome to 3
    .withColumn("outcome", when('outcome > 3, lit(3)).otherwise('outcome))
    .orderBy("id").show
+---+-------+                                                                   
| id|outcome|
+---+-------+
|  1|      3|
|  2|      0|
|  3|      2|
+---+-------+

Upvotes: 2

anky
anky

Reputation: 75140

Here is a proposal using pyspark which you can try replicating in scala or sql:

w = Window.partitionBy("id").orderBy(F.monotonically_increasing_id())

(df.withColumn("Diff",F.col("amount") - F.lag("amount").over(w))
   .withColumn('k', F.lead("Diff").over(w))
   .fillna(0, subset='k').groupby("id").agg(
  F.sum(F.when((F.isnull("Diff") & (F.col("k")<0))|(F.col("Diff")<0),1).otherwise(0))
  .alias("outcome")
).withColumn("outcome",F.when(F.col("outcome")>=3,3).otherwise(F.col("outcome"))) ).show()

+---+-------+
| id|outcome|
+---+-------+
|  1|      3|
|  2|      0|
|  3|      2|
+---+-------+

Upvotes: 2

Related Questions