Reputation: 33
I am trying to select the value of one column based on the values of other rows and other columns.
scala> val df = Seq((1,"051",0,0,10,0),(1,"052",0,0,0,0),(2,"053",10,0,10,0),(2,"054",0,0,10,0),(3,"055",100,50,0,0),(3,"056",100,10,0,0),(3,"057",100,20,0,0),(4,"058",70,15,0,0),(4,"059",70,15,0,20),(4,"060",70,15,0,0)).toDF("id","code","value_1","value_2","value_3","Value_4")
| id|code|value_1|value_2|value_3|Value_4|
| 1| 051| 0| 0| 10| 0|
| 1| 052| 0| 0| 0| 0|
| 2| 053| 10| 0| 10| 0|
| 2| 054| 0| 0| 10| 0|
| 3| 055| 100| 50| 0| 0|
| 3| 056| 100| 10| 0| 0|
| 3| 057| 100| 20| 0| 0|
| 4| 058| 70| 15| 0| 0|
| 4| 059| 70| 15| 0| 20|
| 4| 060| 70| 15| 0| 0|
Calculation Logic:
Select a code for an id, following the steps
Expected Output:
| id|code|value_1|value_2|value_3|Value_4|
| 1| 051| 0| 0| 10| 0|
| 2| 053| 10| 0| 10| 0|
| 3| 055| 100| 50| 0| 0|
| 4| 059| 70| 15| 0| 20|
In case of id 3:
Please help.
Upvotes: 1
Views: 738
Reputation: 14845
If the data is in a form that algorithm is guaranteed to select always one column, the following code produces the expected result:
val w = Window.partitionBy("id")
var df2 = df;
val cols = Seq("value_1", "value_2", "value_3", "value_4")
for( col <- cols) {
df2 = df2.withColumn(s"${col}_max", max(col).over(w))
.withColumn(s"${col}_avg", avg(col).over(w))
var sel = ""
for( col <- cols) {
sel += s"(${col}_max <> ${col}_avg and ${col} = ${col}_max) or"
df2.filter(sel).select("id", ("code" +: cols):_*).sort("id", "code").show
Upvotes: 1
Reputation: 1758
You can put your value_1 to 4 in a struct and call max function on it groupedBy id column using window
| id|code|value_1|value_2|value_3|Value_4|
| 1| 051| 0| 0| 10| 0|
| 1| 052| 0| 0| 0| 0|
| 2| 053| 10| 0| 10| 0|
| 2| 054| 0| 0| 10| 0|
| 3| 055| 100| 50| 0| 0|
| 3| 056| 100| 10| 0| 0|
| 3| 057| 100| 20| 0| 0|
| 4| 058| 70| 15| 0| 0|
| 4| 059| 70| 15| 0| 20|
| 4| 060| 70| 15| 0| 0|
scala> val dfWithVals = df.withColumn("values", struct($"value_1", $"value_2", $"value_3", $"value_4"))
dfWithVals: org.apache.spark.sql.DataFrame = [id: int, code: string ... 5 more fields]
| id|code|value_1|value_2|value_3|Value_4| values|
| 1| 051| 0| 0| 10| 0| [0, 0, 10, 0]|
| 1| 052| 0| 0| 0| 0| [0, 0, 0, 0]|
| 2| 053| 10| 0| 10| 0| [10, 0, 10, 0]|
| 2| 054| 0| 0| 10| 0| [0, 0, 10, 0]|
| 3| 055| 100| 50| 0| 0|[100, 50, 0, 0]|
| 3| 056| 100| 10| 0| 0|[100, 10, 0, 0]|
| 3| 057| 100| 20| 0| 0|[100, 20, 0, 0]|
| 4| 058| 70| 15| 0| 0| [70, 15, 0, 0]|
| 4| 059| 70| 15| 0| 20|[70, 15, 0, 20]|
| 4| 060| 70| 15| 0| 0| [70, 15, 0, 0]|
scala> val overColumns =org.apache.spark.sql.expressions.Window.partitionBy("id")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@de0daca
scala> dfWithVals.withColumn("maxvals", max($"values").over(overColumns)).filter($"values" === $"maxvals").show
| id|code|value_1|value_2|value_3|Value_4| values| maxvals|
| 1| 051| 0| 0| 10| 0| [0, 0, 10, 0]| [0, 0, 10, 0]|
| 3| 055| 100| 50| 0| 0|[100, 50, 0, 0]|[100, 50, 0, 0]|
| 4| 059| 70| 15| 0| 20|[70, 15, 0, 20]|[70, 15, 0, 20]|
| 2| 053| 10| 0| 10| 0| [10, 0, 10, 0]| [10, 0, 10, 0]|
scala> dfWithVals.withColumn("maxvals", max($"values").over(overColumns)).filter($"values" === $"maxvals").drop("values", "maxvals").show
| id|code|value_1|value_2|value_3|Value_4|
| 1| 051| 0| 0| 10| 0|
| 3| 055| 100| 50| 0| 0|
| 4| 059| 70| 15| 0| 20|
| 2| 053| 10| 0| 10| 0|
Upvotes: 1