Teju Priya
Teju Priya

Reputation: 665

group by and filter highest value in data frame in scala

I have some data like this:

a,timestamp,list,rid,sbid,avgvalue
1,1011,1001,4,4,1.20
2,1000,819,2,3,2.40
1,1011,107,1,3,5.40
1,1021,819,1,1,2.10

In the data above I want to find which stamp has the highest tag value (avg. value) based on the tag. Like this.

For time stamp 1011 and a 1:

1,1011,1001,4,4,1.20
1,1011,107,1,3,5.40

The output would be:

1,1011,107,1,3,5.40  //because for timestamp 1011 and tag 1 the higest avg value is 5.40

So I need to pick this column.

I tried this statement, but still it does not work properly:

val highvaluetable = df.registerTempTable("high_value")
val highvalue = sqlContext.sql("select a,timestamp,list,rid,sbid,avgvalue  from high_value")   highvalue.select($"a",$"timestamp",$"list",$"rid",$"sbid",$"avgvalue".cast(IntegerType).as("higher_value")).groupBy("a","timestamp").max("higher_value")
 highvalue.collect.foreach(println)

Any help will be appreciated.

After I applied some of your suggestions, I am still getting duplicates in my data.

+---+----------+----+----+----+----+
|a| timestamp| list|rid|sbid|avgvalue|
+---+----------+----+----+----+----+
|  4|1496745915| 718|   4|   3|0.30|
|  4|1496745918| 362|   4|   3|0.60|
|  4|1496745913| 362|   4|   3|0.60|
|  2|1496745918| 362|   4|   3|0.10|
|  3|1496745912| 718|   4|   3|0.05|
|  2|1496745918| 718|   4|   3|0.30|
|  4|1496745911|1901|   4|   3|0.60|
|  4|1496745912| 718|   4|   3|0.60|
|  2|1496745915| 362|   4|   3|0.30|
|  2|1496745915|1901|   4|   3|0.30|
|  2|1496745910|1901|   4|   3|0.30|
|  3|1496745915| 362|   4|   3|0.10|
|  4|1496745918|3878|   4|   3|0.10|
|  4|1496745915|1901|   4|   3|0.60|
|  4|1496745912| 362|   4|   3|0.60|
|  4|1496745914|1901|   4|   3|0.60|
|  4|1496745912|3878|   4|   3|0.10|
|  4|1496745912| 718|   4|   3|0.30|
|  3|1496745915|3878|   4|   3|0.05|
|  4|1496745914| 362|   4|   3|0.60|
+---+----------+----+----+----+----+

 4|1496745918| 362|   4|   3|0.60|  
 4|1496745918|3878|   4|   3|0.10|

Same time stamp with same tag. This is considered as duplicate.

This is my code:

rdd.createTempView("v1")
val rdd2=sqlContext.sql("select max(avgvalue) as max from v1 group by  (a,timestamp)")
rdd2.createTempView("v2")
val rdd3=sqlContext.sql("select a,timestamp,list,rid,sbid,avgvalue from v1  join v2 on v2.max=v1.avgvalue").show()

Upvotes: 4

Views: 13756

Answers (4)

Kyr
Kyr

Reputation: 5491

All the other solutions provided here did not give me the correct answer so this is what it worked for me with row_number():

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("timestamp").orderBy(desc("avgvalue"))

df.select("a", "timestamp", "list", "rid", "sbid", "avgvalue")
  .withColumn("largest_avgvalue", row_number().over( windowSpec ))
  .filter($"largest_avgvalue" === 1)
  .drop("largest_avgvalue")

The other solutions had the following problems in my tests:

  • The solution with .agg( max(x).as(x), first(y).as(y), ... ) doesn't work because first() function "will return the first value it sees" according to documentation, which means it is non-deterministic,
  • The solution with .withColumn("x", max("y") over windowSpec.orderBy("m") ) doesn't work because the result of the max will be same as in the value that is selecting for the row. I believe the problem there is the orderBy()".

Hence, the following also gives the correct answer, with max():

val windowSpec = Window.partitionBy("timestamp").orderBy(desc("avgvalue"))
    
df.select("a", "timestamp", "list", "rid", "sbid", "avgvalue")
  .withColumn("largest_avgvalue", max("avgvalue").over( windowSpec ))
  .filter($"largest_avgvalue" === $"avgvalue")
  .drop("largest_avgvalue")

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can use dataframe api to find the max as below:

df.groupBy("timestamp").agg(max("avgvalue"))

this will give you output as

+---------+-------------+
|timestamp|max(avgvalue)|
+---------+-------------+
|1021     |2.1          |
|1000     |2.4          |
|1011     |5.4          |
+---------+-------------+

which doesn't include the other fields you require . so you can use first as

df.groupBy("timestamp").agg(max("avgvalue") as "avgvalue", first("a") as "a", first("list") as "list", first("rid") as "rid", first("sbid") as "sbid") 

you should have output as

+---------+--------+---+----+---+----+
|timestamp|avgvalue|a  |list|rid|sbid|
+---------+--------+---+----+---+----+
|1021     |2.1     |1  |819 |1  |1   |
|1000     |2.4     |2  |819 |2  |3   |
|1011     |5.4     |1  |1001|4  |4   |
+---------+--------+---+----+---+----+

The above solution would not still give you correct row-wise output so what you can do is use window function and select the correct row as

import org.apache.spark.sql.functions._

val windowSpec = Window.partitionBy("timestamp").orderBy("a")

df.withColumn("newavg", max("avgvalue") over windowSpec)
  .filter(col("newavg") === col("avgvalue"))
  .drop("newavg").show(false)

This will give row-wise correct data as

+---+---------+----+---+----+--------+
|a  |timestamp|list|rid|sbid|avgvalue|
+---+---------+----+---+----+--------+
|1  |1021     |819 |1  |1   |2.1     |
|2  |1000     |819 |2  |3   |2.4     |
|1  |1011     |107 |1  |3   |5.4     |
+---+---------+----+---+----+--------+

Upvotes: 4

Ishan Kumar
Ishan Kumar

Reputation: 1982

I saw the above answers. Below is the one which you can try as well

val sqlContext=new SQLContext(sc)
case class Tags(a:Int,timestamp:Int,list:Int,rid:Int,sbid:Int,avgvalue:Double)
val rdd=sc.textFile("file:/home/hdfs/stackOverFlow").map(x=>x.split(",")).map(x=>Tags(x(0).toInt,x(1).toInt,x(2).toInt,x(3).toInt,x(4).toInt,x(5).toDouble)).toDF
rdd.createTempView("v1")
val rdd2=sqlContext.sql("select max(avgvalue) as max from v1 group by (a,timestamp)")
rdd2.createTempView("v2")
val rdd3=sqlContext.sql("select a,timestamp,list,rid,sbid,avgvalue from v1 join v2 on v2.max=v1.avgvalue").show()

OutPut

+---+---------+----+---+----+--------+
|  a|timestamp|list|rid|sbid|avgvalue|
+---+---------+----+---+----+--------+
|  2|     1000| 819|  2|   3|     2.4|
|  1|     1011| 107|  1|   3|     5.4|
|  1|     1021| 819|  1|   1|     2.1|
+---+---------+----+---+----+--------+

Upvotes: 0

koiralo
koiralo

Reputation: 23109

You can use groupBy and find the max value for that perticular group as

//If you have the dataframe as df than 

df.groupBy("a", "timestamp").agg(max($"avgvalue").alias("maxAvgValue"))

Hope this helps

Upvotes: 0

Related Questions