Reputation: 191
I need a help on logic.I have a data like this
tag,timestamp,listner,org,suborg,rssi
4,101,1901,4,3,0.60
4,110,1901,4,3,0.90
4,104,1901,4,3,0.30
4,109,1901,4,3,0.40
4,111,1901,4,3,0.60
4,128,1901,4,3,0.40
4,129,1901,4,3,0.80
4,131,1901,4,3,0.60
4,133,1901,4,3,0.30
4,143,1901,4,3,0.60
4,147,1901,4,3,0.70
4,148,1901,4,3,0.40
4,149,1901,4,3,0.30
4,150,1901,4,3,0.90
I have to find average of rssi column which are within 10 seconds timeframe from the latest.
This is my expected output.
tagShortID,timestamp,listenerShortID,rootOrgID,subOrgID,rssi_Weight,rssi_Weight_avg
4,150,1901,4,3,0.9,0.58
4,149,1901,4,3,0.3,0.5
4,148,1901,4,3,0.4,0.56
4,147,1901,4,3,0.7,0.64
4,143,1901,4,3,0.6,0.44
4,133,1901,4,3,0.3,0.525
4,131,1901,4,3,0.6,0.6
4,129,1901,4,3,0.8,0.6
4,128,1901,4,3,0.4,0.4
4,111,1901,4,3,0.6,0.6
4,110,1901,4,3,0.9,0.9
4,109,1901,4,3,0.4,0.4
4,104,1901,4,3,0.3,0.3
4,101,1901,4,3,0.6,0.6
I tried this
df.withColumn("firstValue", first("Timestamp") over Window.orderBy($"Timestamp".desc).partitionBy("tagShortID", "ListenerShortID"))
.filter($"firstValue".cast("long")-$"Timestamp".cast("long") <= 10)
.withColumn("count", count("Timestamp") over Window.partitionBy("tagShortID", "ListenerShortID"))
.withColumn("RSSI_Weight", when($"count" >= 10, avg($"RSSI_Weight") over Window.orderBy("Timestamp").partitionBy("tagShortID", "ListenerShortID").rowsBetween(Long.MinValue, 0)) otherwise($"RSSI_Weight"))
.drop("firstValue", "count")
.show(30, false)
This above will check for high value timestamp and then do - 10 seconds. But i need to iterate with each and every timestamp and check for 10 seconds.If yes avg else take rssi value.
Any help will be appreciated.
Upvotes: 1
Views: 55
Reputation: 41957
you can use following logic in rdd to get the dataframe you need
def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = {
val currentTimeStamp = list(1).toLong
var sum = 0.0
var count = 0
var check = false
import scala.util.control.Breaks._
breakable {
for (array <- buffer) {
val toCheckTimeStamp = array(1).toLong
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
sum += array(5).toDouble
count += 1
}
if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
check = true
break
}
}
}
if (sum != 0.0 && check) list :+ sum / count
else list :+ list(5).toDouble
}
import sqlContext.implicits._
val averageDF = sc.textFile("path to your csv file")
.map(line => line.split(",").map(_.trim))
.sortBy(array => array(1), false)
.groupBy(array => (array(0), array(2)))
.mapValues(buffer => {
buffer.map(list => {
avgCalc(buffer, list)
})
})
.flatMap(x => x._2)
.map(x => Jessi(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble))
.toDF
averageDF.show
Where Jessi is a case class
case class Jessi(tagShortID: String, Timestamp: Long, ListenerShortID: String, rootOrgID: String, subOrgID: String, RSSI_Weight: Double, RSSI_Weight_avg: Double)
so you should have following output
+----------+---------+---------------+---------+--------+-----------+-------------------+
|tagShortID|Timestamp|ListenerShortID|rootOrgID|subOrgID|RSSI_Weight|RSSI_Weight_avg |
+----------+---------+---------------+---------+--------+-----------+-------------------+
|4 |150 |1901 |4 |3 |0.9 |0.58 |
|4 |149 |1901 |4 |3 |0.3 |0.5 |
|4 |148 |1901 |4 |3 |0.4 |0.5666666666666668 |
|4 |147 |1901 |4 |3 |0.7 |0.6499999999999999 |
|4 |143 |1901 |4 |3 |0.6 |0.44999999999999996|
|4 |133 |1901 |4 |3 |0.3 |0.525 |
|4 |131 |1901 |4 |3 |0.6 |0.6 |
|4 |129 |1901 |4 |3 |0.8 |0.6000000000000001 |
|4 |128 |1901 |4 |3 |0.4 |0.4 |
|4 |111 |1901 |4 |3 |0.6 |0.6 |
|4 |110 |1901 |4 |3 |0.9 |0.9 |
|4 |109 |1901 |4 |3 |0.4 |0.4 |
|4 |104 |1901 |4 |3 |0.3 |0.3 |
|4 |101 |1901 |4 |3 |0.6 |0.6 |
+----------+---------+---------------+---------+--------+-----------+-------------------+
Upvotes: 1