Mauricio Ortiz
Mauricio Ortiz

Reputation: 33

Compare Value of Current and Previous Row, and after for Column if required in Spark

I am trying to select the value of one column based on the values ​​of other rows and other columns.

scala> val df = Seq((1,"051",0,0,10,0),(1,"052",0,0,0,0),(2,"053",10,0,10,0),(2,"054",0,0,10,0),(3,"055",100,50,0,0),(3,"056",100,10,0,0),(3,"057",100,20,0,0),(4,"058",70,15,0,0),(4,"059",70,15,0,20),(4,"060",70,15,0,0)).toDF("id","code","value_1","value_2","value_3","Value_4")
scala> df.show()
+---+----+-------+-------+-------+-------+
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  1| 052|      0|      0|      0|      0|
|  2| 053|     10|      0|     10|      0|
|  2| 054|      0|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  3| 056|    100|     10|      0|      0| 
|  3| 057|    100|     20|      0|      0| 
|  4| 058|     70|     15|      0|      0| 
|  4| 059|     70|     15|      0|     20| 
|  4| 060|     70|     15|      0|      0| 
+---+----+-------+-------+-------+-------+

Calculation Logic:

Select a code for an id, following the steps

  1. For each column n(value_1,value_2,value_3,value_4), do
  2. For the same id look for the maximum value in the value_n column
  3. If the maximum value is repeated, the next column is evaluated
  4. Otherwise, if the maximum value is found without repetition, the id and the code of the column with the maximum value are taken. It is no longer necessary to evaluate the following columns.

Expected Output:

+---+----+-------+-------+-------+-------+
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  2| 053|     10|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  4| 059|     70|     15|      0|     20|
+---+----+-------+-------+-------+-------+

In case of id 3:

Please help.

Upvotes: 1

Views: 738

Answers (2)

werner
werner

Reputation: 14845

If the data is in a form that algorithm is guaranteed to select always one column, the following code produces the expected result:

val w = Window.partitionBy("id")

var df2 = df;
val cols = Seq("value_1", "value_2", "value_3", "value_4")
for( col <- cols) {
  df2 = df2.withColumn(s"${col}_max", max(col).over(w))
    .withColumn(s"${col}_avg", avg(col).over(w))
}

var sel = ""
for( col <- cols) {
  sel += s"(${col}_max <> ${col}_avg and ${col} = ${col}_max) or"
}
sel.dropRight(2)

df2.filter(sel).select("id", ("code" +: cols):_*).sort("id", "code").show

Upvotes: 1

C.S.Reddy Gadipally
C.S.Reddy Gadipally

Reputation: 1758

You can put your value_1 to 4 in a struct and call max function on it groupedBy id column using window


scala> df.show
+---+----+-------+-------+-------+-------+
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  1| 052|      0|      0|      0|      0|
|  2| 053|     10|      0|     10|      0|
|  2| 054|      0|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  3| 056|    100|     10|      0|      0|
|  3| 057|    100|     20|      0|      0|
|  4| 058|     70|     15|      0|      0|
|  4| 059|     70|     15|      0|     20|
|  4| 060|     70|     15|      0|      0|
+---+----+-------+-------+-------+-------+


scala> val dfWithVals = df.withColumn("values", struct($"value_1", $"value_2", $"value_3", $"value_4"))
dfWithVals: org.apache.spark.sql.DataFrame = [id: int, code: string ... 5 more fields]

scala> dfWithVals.show
+---+----+-------+-------+-------+-------+---------------+
| id|code|value_1|value_2|value_3|Value_4|         values|
+---+----+-------+-------+-------+-------+---------------+
|  1| 051|      0|      0|     10|      0|  [0, 0, 10, 0]|
|  1| 052|      0|      0|      0|      0|   [0, 0, 0, 0]|
|  2| 053|     10|      0|     10|      0| [10, 0, 10, 0]|
|  2| 054|      0|      0|     10|      0|  [0, 0, 10, 0]|
|  3| 055|    100|     50|      0|      0|[100, 50, 0, 0]|
|  3| 056|    100|     10|      0|      0|[100, 10, 0, 0]|
|  3| 057|    100|     20|      0|      0|[100, 20, 0, 0]|
|  4| 058|     70|     15|      0|      0| [70, 15, 0, 0]|
|  4| 059|     70|     15|      0|     20|[70, 15, 0, 20]|
|  4| 060|     70|     15|      0|      0| [70, 15, 0, 0]|
+---+----+-------+-------+-------+-------+---------------+


scala> val overColumns =org.apache.spark.sql.expressions.Window.partitionBy("id")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@de0daca

scala> dfWithVals.withColumn("maxvals", max($"values").over(overColumns)).filter($"values" === $"maxvals").show
+---+----+-------+-------+-------+-------+---------------+---------------+      
| id|code|value_1|value_2|value_3|Value_4|         values|        maxvals|
+---+----+-------+-------+-------+-------+---------------+---------------+
|  1| 051|      0|      0|     10|      0|  [0, 0, 10, 0]|  [0, 0, 10, 0]|
|  3| 055|    100|     50|      0|      0|[100, 50, 0, 0]|[100, 50, 0, 0]|
|  4| 059|     70|     15|      0|     20|[70, 15, 0, 20]|[70, 15, 0, 20]|
|  2| 053|     10|      0|     10|      0| [10, 0, 10, 0]| [10, 0, 10, 0]|
+---+----+-------+-------+-------+-------+---------------+---------------+



scala> dfWithVals.withColumn("maxvals", max($"values").over(overColumns)).filter($"values" === $"maxvals").drop("values", "maxvals").show
+---+----+-------+-------+-------+-------+                                      
| id|code|value_1|value_2|value_3|Value_4|
+---+----+-------+-------+-------+-------+
|  1| 051|      0|      0|     10|      0|
|  3| 055|    100|     50|      0|      0|
|  4| 059|     70|     15|      0|     20|
|  2| 053|     10|      0|     10|      0|
+---+----+-------+-------+-------+-------+

Upvotes: 1

Related Questions