Jessi joseph
Jessi joseph

Reputation: 191

aggregate multiple columns in spark window function

I am trying aggregation for multiple columns. Here is my scenario.

I have a columns like this TagID,ListnerID,Timestamp and RSSI_weightage.

I want to select the highest rssi_weightage's for each TagID. And Also I need to pick the hightest Timestamp for the above association.

Example.

TagID,ListnerID,Timestamp,RSSI_Weightage
2,101,1496745906,90
3,102,1496745907,70
3,104,1496745906,80
2,101,1496745909,60
4,106,1496745908,60

My expected output

2,101,1496745909,90
3,104,1496745907,80
4,106,1496745908,60

Explanation

For TagID 2 the highest rssi_weighatge is 90.But the latest timestamp is 1496745909.That is my first row .Same thing I did for all tagID's.

The above is my expected output. I tried like this.

val window = Window.partitionBy("tagShortID", "ListenerShortID").orderBy("RSSI_Weightage","Timestamp")
 val prox = averageDF.withColumn("rank", row_number().over(window)).where($"rank" === 1) 

But the above statement not works properly. I am getting incorrect results.Also If I apply desc in order by, It throws error.

Upvotes: 0

Views: 4702

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Given the dataframe as

+----------+---------------+----------+---------------+
|tagShortID|ListenerShortID|Timestamp |RSSI_Weight_avg|
+----------+---------------+----------+---------------+
|2         |101            |1496745906|90             |
|3         |102            |1496745907|70             |
|3         |104            |1496745906|80             |
|2         |101            |1496745909|60             |
|4         |106            |1496745908|60             |
+----------+---------------+----------+---------------+

You can do the following with Window function. But you will have to do some additional tasks like filter and drop

import org.apache.spark.sql.functions._
val window = Window.partitionBy($"tagShortID")
val prox = averageDF.withColumn("RSSI_Weight_avg", max($"RSSI_Weight_avg").over(window))
  .withColumn("rank", rank().over(window.orderBy($"Timestamp".desc)))
  .filter($"rank" === 1)
  .drop("rank")

You will get result as

+----------+---------------+----------+---------------+
|tagShortID|ListenerShortID|Timestamp |RSSI_Weight_avg|
+----------+---------------+----------+---------------+
|3         |102            |1496745907|80             |
|4         |106            |1496745908|60             |
|2         |101            |1496745909|90             |
+----------+---------------+----------+---------------+

You can impove the above code though. I hope the answer is helpful

Upvotes: 3

Related Questions