Reputation: 3782
When I create UDF function as shown above, I get Task Serialization error. This error appears only when I run the code in the cluster deploy mode using spark-submit
. However, it works well in spark-shell.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
def mfnURL(arr: WrappedArray[String]): String = {
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}
val mfnURLUDF = udf(mfnURL _)
def windowSpec = Window.partitionBy("nodeId", "url", "typology")
val result = df.withColumn("count", count("url").over(windowSpec))
.orderBy($"count".desc)
.groupBy("nodeId","typology")
.agg(
first("url"),
mfnURLUDF(collect_list("source_url")),
min("minTimestamp"),
max("maxTimestamp")
)
I tried to add spark.udf.register("mfnURLUDF",mfnURLUDF)
, but it did not solve the problem.
Upvotes: 2
Views: 1228
Reputation: 476
You can also try to create udf this way:
val mfnURL = udf { arr: WrappedArray[String] =>
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}
Upvotes: 2