Reputation: 45
My requirement is to get the max count of decrease in values
Below is my input dataset:
+---+-------+
| id| amount|
+---+-------+
| 1| 10.0|
| 1| 9.0|
| 1| 7.0|
| 1| 6.0|
| 2| 50.0|
| 2| 60.0|
| 2| 70.0|
| 3| 90.0|
| 3| 80.0|
| 3| 90.0|
+---+-------+
and my required outcome is as below:
+---+--------+
| id| outcome|
+---+--------+
| 1| 3|
| 2| 0|
| 3| 2|
+---+--------+
My outcome (new column) is based on group by id and how many times the value has decreased consecutively for 3 times. For id 1 even though it reduced 4 times, I want only max of 3 times.
Any suggestion or help will be greatly appreciated in spark sql or spark dataframe(scala).
Upvotes: 2
Views: 328
Reputation: 10406
You first need an ordering column to compute the decrease. In your example there is none so we can build an index
column with monotonically_increasing_id
. Then, we can use a window and the lag
and lead
functions to get what you want:
import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy("id").orderBy("index")
df
.withColumn("index", monotonically_increasing_id)
// there is a decrease if the amount is less than the next one
// or greater than the previous one
.withColumn("decrease", (lag('amount, 1).over(win) > 'amount) ||
(lead('amount, 1).over(win) < 'amount)
)
.groupBy("id")
// we need to cast the boolean to an int to sum them
.agg(sum('decrease cast "int") as "outcome")
// capping the outcome to 3
.withColumn("outcome", when('outcome > 3, lit(3)).otherwise('outcome))
.orderBy("id").show
+---+-------+
| id|outcome|
+---+-------+
| 1| 3|
| 2| 0|
| 3| 2|
+---+-------+
Upvotes: 2
Reputation: 75140
Here is a proposal using pyspark
which you can try replicating in scala or sql:
w = Window.partitionBy("id").orderBy(F.monotonically_increasing_id())
(df.withColumn("Diff",F.col("amount") - F.lag("amount").over(w))
.withColumn('k', F.lead("Diff").over(w))
.fillna(0, subset='k').groupby("id").agg(
F.sum(F.when((F.isnull("Diff") & (F.col("k")<0))|(F.col("Diff")<0),1).otherwise(0))
.alias("outcome")
).withColumn("outcome",F.when(F.col("outcome")>=3,3).otherwise(F.col("outcome"))) ).show()
+---+-------+
| id|outcome|
+---+-------+
| 1| 3|
| 2| 0|
| 3| 2|
+---+-------+
Upvotes: 2