Knight71
Knight71

Reputation: 2949

What is causing the sort Function serializable in spark?

I am getting task not serializable for below code. However if I pass the function directly I don't get any error.

abstract class MyAbstractClass[T, SortOrder](implicit ord: Ordering[SortOrder]) {
  def getSorterFunc(): (T) => SortOrder
  def sort(rdd: RDD[List[T]]) = {
    val sortFunc = getSorterFunc()
    rdd.map(x => x.sortBy(sortFunc))
  }
}

object SampleObject extends MyAbstractClass[(String, Int, List[Int]), (String, Int)] {
  def getSorterFunc() = {
    case (username, id, addresses) => (username, id)
  }
}

val data = Array(("user1",1,List(12,211)),("u2",1,List(12,211)),("u1",2,List(12,211))).toList
val dataList = Array(data,data)
val rdd = sc.parallelize(dataList)
// This is working fine 
rdd.map(x => x.sortBy{ case (username, id, addresses) => (username, id) }) 
// This is giving task not serializable error.
SampleObject.sort(rdd)
rdd.collect

Error:

 org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

what is causing the error?

Upvotes: 1

Views: 333

Answers (2)

Victor Moroz
Victor Moroz

Reputation: 9225

Looks like there is a pretty simple solution (based on Tzach Zohar's findings):

...
def sort(rdd: RDD[List[T]]) = {
  val sortFunc = getSorterFunc()
  val ordLocal = ord
  rdd.map(x => x.sortBy(sortFunc)(ordLocal))
}

Upvotes: 1

Tzach Zohar
Tzach Zohar

Reputation: 37842

The culprit here is the implicit ord: Ordering[SortOrder] - since the sort method (called within an anonymous function that must be serialized by Spark) implicitly takes ord as argument, the encapsulating object SampleObject must be serializable.

This is why using the local variable sortFunc isn't enough - indeed this relieves Spark of the need to serialize the object on which getSorterFunc is called, but there's still ord to take care of.

As mentioned in comments, you can fix it by making SampleObject (or MyAbstractClass) extend Seriablizable.

Another option would be to move the implicit parameter from the class (which makes it a member, which means it "carries" the class along with it when serialized) to the method:

abstract class MyAbstractClass[T, SortOrder] {

  def getSorterFunc(): (T) => SortOrder

  def sort(rdd: RDD[List[T]])(implicit ord: Ordering[SortOrder]): RDD[List[T]] = {
    val sortFunc = getSorterFunc()
    rdd.map(x => x.sortBy(sortFunc))
  }
}

This works without having to serialize the class.

Upvotes: 4

Related Questions