Reputation: 2949
I am getting task not serializable for below code. However if I pass the function directly I don't get any error.
abstract class MyAbstractClass[T, SortOrder](implicit ord: Ordering[SortOrder]) {
def getSorterFunc(): (T) => SortOrder
def sort(rdd: RDD[List[T]]) = {
val sortFunc = getSorterFunc()
rdd.map(x => x.sortBy(sortFunc))
}
}
object SampleObject extends MyAbstractClass[(String, Int, List[Int]), (String, Int)] {
def getSorterFunc() = {
case (username, id, addresses) => (username, id)
}
}
val data = Array(("user1",1,List(12,211)),("u2",1,List(12,211)),("u1",2,List(12,211))).toList
val dataList = Array(data,data)
val rdd = sc.parallelize(dataList)
// This is working fine
rdd.map(x => x.sortBy{ case (username, id, addresses) => (username, id) })
// This is giving task not serializable error.
SampleObject.sort(rdd)
rdd.collect
Error:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
what is causing the error?
Upvotes: 1
Views: 333
Reputation: 9225
Looks like there is a pretty simple solution (based on Tzach Zohar's findings):
...
def sort(rdd: RDD[List[T]]) = {
val sortFunc = getSorterFunc()
val ordLocal = ord
rdd.map(x => x.sortBy(sortFunc)(ordLocal))
}
Upvotes: 1
Reputation: 37842
The culprit here is the implicit ord: Ordering[SortOrder]
- since the sort
method (called within an anonymous function that must be serialized by Spark) implicitly takes ord
as argument, the encapsulating object SampleObject
must be serializable.
This is why using the local variable sortFunc
isn't enough - indeed this relieves Spark of the need to serialize the object on which getSorterFunc
is called, but there's still ord
to take care of.
As mentioned in comments, you can fix it by making SampleObject
(or MyAbstractClass
) extend Seriablizable
.
Another option would be to move the implicit parameter from the class (which makes it a member, which means it "carries" the class along with it when serialized) to the method:
abstract class MyAbstractClass[T, SortOrder] {
def getSorterFunc(): (T) => SortOrder
def sort(rdd: RDD[List[T]])(implicit ord: Ordering[SortOrder]): RDD[List[T]] = {
val sortFunc = getSorterFunc()
rdd.map(x => x.sortBy(sortFunc))
}
}
This works without having to serialize the class.
Upvotes: 4