Reputation: 18108
I was looking at some examples on blogs of UDFs that appear to work, but in fact when I run them they give the infamous task not serializable error.
I find it strange that this is published and no such mention made. Running Spark 2.4.
Code, pretty straight forward something must have changed in Spark?:
def lowerRemoveAllWhitespace(s: String): String = {
s.toLowerCase().replaceAll("\\s", "")
}
val lowerRemoveAllWhitespaceUDF = udf[String, String](lowerRemoveAllWhitespace)
import org.apache.spark.sql.functions.col
val df = sc.parallelize(Seq(
("r1 ", 1, 1, 3, -2),
("r 2", 6, 4, -2, -2),
("r 3", 4, 1, 1, 0),
("r4", 1, 2, 4, 5)
)).toDF("ID", "a", "b", "c", "d")
df.select(lowerRemoveAllWhitespaceUDF(col("ID"))).show(false)
returns:
org.apache.spark.SparkException: Task not serializable
From this blog that I find good: https://medium.com/@mrpowers/spark-user-defined-functions-udfs-6c849e39443b
Something must have changed???
I looked at the top voted item here with an Object and extends Serializable but no joy either. Puzzled.
EDIT
Things seems to have changed, this format needed:
val squared = udf((s: Long) => s * s)
The Object approach still interest me why it failed.
Upvotes: 2
Views: 3745
Reputation: 18108
The example that was posted was from a reputable source, but I cannot get to run without a Serialization error in Spark 2.4, trying of Objects etc. did not help either.
I solved the issue as follows using the udf(( .. approach which looks like a single statement only possible and indeed I could that and voila no serialization. A slightly different example though using primitives.
val sumContributionsPlus = udf((n1: Int, n2: Int, n3: Int, n4: Int) => Seq(n1,n2,n3,n4).foldLeft(0)( (acc, a) => if (a > 0) acc + a else acc))
On a final note, the whole discussion on UDF, Spark native, columns UDFs is confusing when things no longer appear to work.
Upvotes: 0
Reputation: 4499
I couldn't reproduce the error (tried on spark 1.6, 2.3, and 2.4), but I do remember facing this kind of error (long time ago). I'll put in my best guess.
The problem happens due to difference between Method and Function in scala. As described in detail here.
Short version of that is when you write def
it is equivalent to methods in java, i.e part of a a class and can be invoked using the instance of the class.
When you write udf((s: Long) => s * s)
it creates an instance of trait Function1
. For this to happen an anonymous class implementing Function1
is generated whose apply method is is something like def apply(s: Long):Long= {s * s}
, and the instance of this class is passed as parameter to udf
.
However when you write udf[String, String](lowerRemoveAllWhitespace)
the method lowerRemoveAllWhitespace
needs to be converted to Function1
instance and passed to udf
. This is where the serialization fails, since the apply method on this instance will try to invoke lowerRemoveAllWhitespace
on instance of another object (which could not be serialized and sent to the worker jvm process) causing the exception.
Upvotes: 2