Eyal
Eyal

Reputation: 3513

Spark Aggregator with Array as Input

I am migrating some Scala Spark UDAFs from UserDefinedAggregateFunction to Aggregator. One of them takes as input Array[String], and I am getting a strange exception when executing the Aggregator in a local test. I have reduced my code to a very basic Aggregator, but I'm still getting this error when reading Arrays. I don't have a problem with other input types.

The simplified example is as follows:

  class ArrayInputAggregator extends Aggregator[Array[String], Int, Int] with Serializable {

    override def zero = {0}

    override def reduce(buffer: Int, newItem: Array[String]): Int = {
      buffer + newItem.length
    }

    override def merge(b1: Int, b2: Int): Int = {
      b1 + b2
    }

    override def finish(reduction: Int): Int = reduction
    def bufferEncoder: Encoder[Int] = Encoders.scalaInt
    def outputEncoder: Encoder[Int] = Encoders.scalaInt
  }
}

I am testing it with this code:

val test = udaf(new ArrayInputAggregator())

val d = spark
      .sql("select array('asd','tre','asd') arr")
      .groupBy()
      .agg(test($"arr").as("cnt"))

d.show

This is the exception I am getting:

2023-12-24 12:06:24,678 ERROR spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException: null at org.apache.spark.sql.catalyst.expressions.objects.MapObjects$.apply(objects.scala:682) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3033) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$172$$anonfun$10.applyOrElse(Analyzer.scala:3029) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) ~[spark-catalyst_2.12-3.0.0.jar:3.0.0] at

(it goes on and on)

Upvotes: 0

Views: 86

Answers (1)

shaloms
shaloms

Reputation: 46

It looks like this bug has been fixed in Spark 3.0.1 and higher.

Upvotes: 1

Related Questions