Reputation: 57
I have a dataframe (df) that looks like this
col1 col2 col3
a 20 0
a 21 1
a 22 0
a 23 1
a 24 0
a 25 1
b 20 0
b 21 0
b 22 1
b 23 1
b 24 0
b 25 1
I want to calculate the minimum and maximum value of col2 in the last 5 rows wherever col3 = 1, for every row in the dataframe. So, to calculate my minimin / maximum, the row should have col3 = 1.
Desired output
col1 col2 col3 minLast5 maxLast5
a 20 0 0 0
a 21 1 0 0
a 22 0 21 21
a 23 1 21 21
a 24 0 21 23
a 25 1 21 23
b 20 0 0 0
b 21 0 0 0
b 22 1 0 0
b 23 1 22 22
b 24 0 22 23
b 25 1 22 23
I have tried the following code
df
.withColumn("minLast5", when($"col3">0, min("col2").over(Window
.partitionBy($"col1")
.orderBy($"col2")
.rangeBetween(-5,-1))
.otherwise(0))
.withColumn("maxLast5", when($"col3">0, max("col2").over(Window
.partitionBy($"col1")
.orderBy($"col2")
.rangeBetween(-5,-1))
.otherwise(0))
The above code gives the wrong output. It gives me the minimum / maximum value of col 2 for rows where col3 is 1.
Any ideas on how I could solve this?
Thanks in advance!
Upvotes: 1
Views: 278
Reputation: 8711
Check this one. Optimizations are welcome!
scala> val df = Seq(
| ("a",20,0 ),
| ("a",21,1 ),
| ("a",22,0 ),
| ("a",23,1 ),
| ("a",24,0 ),
| ("a",25,1 ),
| ("b",20,0 ),
| ("b",21,0 ),
| ("b",22,1 ),
| ("b",23,1 ),
| ("b",24,0 ),
| ("b",25,1 )
| ).toDF("col1","col2","col3")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
scala> val df2 = df.withColumn("col3list", collect_list("col3")over(Window.partitionBy($"col1").orderBy($"col2").rangeBetween(-5,-1))).withColumn("col2list",collect_list("col2")over(Window.partitionBy($"col1").orderBy($"col2").rangeBetween(-5,-1)))
df2: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 3 more fields]
scala> def col3_max_min(x:String,y:String):Map[Int,Int]=
| {
| if(x.contains(","))
| {
| val x1 = x.split(",").map(_.trim).map(_.toInt)
| val y1 = y.split(",").map(_.trim).map(_.toInt)
| val p = y1.zip(x1).filter(_._2 > 0 ).toMap
| if ( p.isEmpty ) Map(0->0) else p
| }
| else
| return Map(0->0)
| }
col3_max_min: (x: String, y: String)Map[Int,Int]
scala> val myudfcol3 = udf( col3_max_min(_:String,_:String):Map[Int,Int] )
myudfcol3: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(IntegerType,IntegerType,false),Some(List(StringType, StringType)))
scala> val df3 = df2.withColumn("max_min",myudfcol3( concat_ws(",",col("col3list")), concat_ws(",",col("col2list"))))
df3: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 4 more fields]
scala> val df4 = df3.withColumn("maxx",sort_array(map_keys(col("max_min")))(size(col("max_min"))-1)).withColumn("minn",sort_array(map_keys(col("max_min")))(0))
df4: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 6 more fields]
scala> df4.select('col1,'col2,'col3,'minn,'maxx).show(false)
+----+----+----+----+----+
|col1|col2|col3|minn|maxx|
+----+----+----+----+----+
|b |20 |0 |0 |0 |
|b |21 |0 |0 |0 |
|b |22 |1 |0 |0 |
|b |23 |1 |22 |22 |
|b |24 |0 |22 |23 |
|b |25 |1 |22 |23 |
|a |20 |0 |0 |0 |
|a |21 |1 |0 |0 |
|a |22 |0 |21 |21 |
|a |23 |1 |21 |21 |
|a |24 |0 |21 |23 |
|a |25 |1 |21 |23 |
+----+----+----+----+----+
scala>
Upvotes: 1
Reputation: 7207
Condition "when" can be included in min/max functions:
val df = List(
("a", 20, 0),
("a", 21, 1),
("a", 22, 0),
("a", 23, 1),
("a", 24, 0),
("a", 25, 1),
("b", 20, 0),
("b", 21, 0),
("b", 22, 1),
("b", 23, 1),
("b", 24, 0),
("b", 25, 1)
).toDF("col1", "col2", "col3")
val window = Window.partitionBy($"col1")
.orderBy($"col2")
.rangeBetween(-5, -1)
val result = df
.withColumn("minLast5", min(when($"col3" === 1, $"col2").otherwise(lit(null))).over(window))
.withColumn("maxLast5", max(when($"col3" === 1, $"col2").otherwise(lit(null))).over(window))
// replace null with 0
.withColumn("minLast5", when($"minLast5".isNull, 0).otherwise($"minLast5"))
.withColumn("maxLast5", when($"maxLast5".isNull, 0).otherwise($"maxLast5"))
result.show(false)
Output:
+----+----+----+--------+--------+
|col1|col2|col3|minLast5|maxLast5|
+----+----+----+--------+--------+
|a |20 |0 |0 |0 |
|a |21 |1 |0 |0 |
|a |22 |0 |21 |21 |
|a |23 |1 |21 |21 |
|a |24 |0 |21 |23 |
|a |25 |1 |21 |23 |
|b |20 |0 |0 |0 |
|b |21 |0 |0 |0 |
|b |22 |1 |0 |0 |
|b |23 |1 |22 |22 |
|b |24 |0 |22 |23 |
|b |25 |1 |22 |23 |
+----+----+----+--------+--------+
Upvotes: 1