Reputation: 3513
I am migrating some Scala Spark UDAFs from UserDefinedAggregateFunction to Aggregator. One of them takes as input Array[String], and I am getting a strange exception when executing the Aggregator in a local test. I have reduced my code to a very basic Aggregator, but I'm still getting this error when reading Arrays. I don't have a problem with other input types.
The simplified example is as follows:
class ArrayInputAggregator extends Aggregator[Array[String], Int, Int] with Serializable {
override def zero = {0}
override def reduce(buffer: Int, newItem: Array[String]): Int = {
buffer + newItem.length
}
override def merge(b1: Int, b2: Int): Int = {
b1 + b2
}
override def finish(reduction: Int): Int = reduction
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
}
I am testing it with this code:
val test = udaf(new ArrayInputAggregator())
val d = spark
.sql("select array('asd','tre','asd') arr")
.groupBy()
.agg(test($"arr").as("cnt"))
d.show
This is the exception I am getting:
2023-12-24 12:06:24,678 ERROR spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException: null at org.apache.spark.sql.catalyst.expressions.objects.MapObjects$.apply(objects.scala:682) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3033) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3029) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at
(it goes on and on)
Upvotes: 0
Views: 86