Reputation: 191
Hi I am trying UDAF with spark scala. I am getting the following exception.
Description Resource Path Location Type type mismatch; found : scala.collection.immutable.IndexedSeq[Any] required: String SumCalc.scala /filter line 63 Scala Problem
This is my code.
override def evaluate(buffer: Row): Any = {
val in_array = buffer.getAs[WrappedArray[String]](0);
var finalArray = Array.empty[Array[String]]
import scala.util.control.Breaks._
breakable {
for (outerArray <- in_array) {
val currentTimeStamp = outerArray(1).toLong
var sum = 0.0
var count = 0
var check = false
var list = outerArray
for (array <- in_array) {
val toCheckTimeStamp = array(1).toLong
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
sum += array(5).toDouble
count += 1
}
if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
check = true
break
}
}
if (sum != 0.0 && check) list = list :+ (sum).toString // getting error on this line.
else list = list :+ list(5).toDouble.toString
finalArray ++= Array(list)
}
finalArray
}
}
Any help will be appreciated.
Upvotes: 0
Views: 364
Reputation: 41957
There are a couple of mistakes in your evaluate
function of UDAF
.
list
variable is a string
but you are treating it as an array
finalArray
is initialized as Array.empty[Array[String]]
but later on you are adding Array(list)
to the finalArray
You are not returning finalArray
from evaluate
method as its inside for
loop
So the correct way should be as below
override def evaluate(buffer: Row): Any = {
val in_array = buffer.getAs[WrappedArray[String]](0);
var finalArray = Array.empty[String]
import scala.util.control.Breaks._
breakable {
for (outerArray <- in_array) {
val currentTimeStamp = outerArray(1).toLong // timestamp values
var sum = 0.0
var count = 0
var check = false
var list = outerArray
for (array <- in_array) {
val toCheckTimeStamp = array(1).toLong
if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) {
sum += array(5).toDouble // RSSI weightage values
count += 1
}
if ((currentTimeStamp - 10L) > toCheckTimeStamp) {
check = true
break
}
}
if (sum != 0.0 && check) list = list + (sum).toString // calculate sum for the 10 secs difference
else list = list + (sum).toString // If 10 secs difference is not there take rssi weightage value
finalArray ++= Array(list)
}
}
finalArray // Final results for this function
}
Hope the answer is helpful
Upvotes: 1