user1514879
user1514879

Reputation:

Spark scala task not serializable for closure

I have an RDD of Rows and I want to filter based on a closure. Ultimately I want to pass the closure in as a parameter to my method which is doing the filter, but I've simplified it and I can reproduce the error with something simple like this.

def fn(l: Long): Boolean = true
rdd.filter{ row => fn(row.getAs[Long]("field")) }

I tried putting fn into a case object, an object that extends a serializable trait, defining fn inside and outside of the method calling filter. I'm trying to figure out what I need to do without getting these errors. I know there are a lot of questions about this already on stack overflow and I've been looking for a suitable answer but I can't find it.

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.RDD.filter(RDD.scala:340)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
$line131.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
$line131.$read$$iwC$$iwC$$iwC.<init>(<console>:54)
$line131.$read$$iwC$$iwC.<init>(<console>:56)
$line131.$read$$iwC.<init>(<console>:58)
$line131.$read.<init>(<console>:60)
$line131.$read$.<init>(<console>:64)
$line131.$read$.<clinit>(<console>)
$line131.$eval$.<init>(<console>:7)
$line131.$eval$.<clinit>(<console>)
$line131.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:601)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:351)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:722)

UPDATE:

A more complete example. I am running Jupyter with Toree and executing code from a jar file in my cells. Here are three of the things I tried which fail

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

class NotWorking1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) {
  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](longField)) }
}

object NotWorking1 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    def myFn(l: Long): Boolean = true
    new NotWorking1(sc, sqlContext, myFn)
  }
}

class NotWorking2(sc: SparkContext, sqlContext: SQLContext) {
  def myFn(l: Long): Boolean = true

  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
    rdd.filter{ row => myFn(row.getAs[Long](longField)) }
  }
}

object NotWorking2 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    new NotWorking2(sc, sqlContext)
  }
}

class NotWorking3(sc: SparkContext, sqlContext: SQLContext) {
  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
    def myFn(l: Long): Boolean = true
    rdd.filter{ row => myFn(row.getAs[Long](longField)) }
  }
}

object NotWorking3 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    new NotWorking3(sc, sqlContext)
  }
}

from the Jupyter cell, I import the appropriate class and run

val nw1 = NotWorking1(sc, sqlContext)
val nw2 = NotWorking2(sc, sqlContext)
val nw3 = NotWorking3(sc, sqlContext)
nw1.myFilterer(rdd, "field")
nw2.myFilterer(rdd, "field")
nw3.myFilterer(rdd, "field")

All three fail. NotWorking3 is particularly surprising. It their anything I can do to isolate the function and not try to serialize the whole object (which I believe will get me into trouble anyway since I'm keeping a reference to the spark and sql contexts)

Upvotes: 0

Views: 1701

Answers (2)

soorajmr
soorajmr

Reputation: 530

The easiest way in my experience has been to just use functions instead of methods if you want them to be serializable. In other words, if you want your pieces of code to be shipped to the executors, define them using val, not def.

In your example, in the class NotWorking3, change myFn as below and it will work:

val myFn = (l: Long) => true

Update:

For NotWorking1 and 2, along with using val instead of def, you also need to extend Serializable trait and use the @SerialVersionUID annotation. Here is the working version of your examples (with slight changes here and there):

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

@SerialVersionUID(100L)
class Working1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) extends Serializable{
  def myFilterer(rdd:RDD[Row]): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](0)) }
}

@SerialVersionUID(101L)
class Working2 (sc: SparkContext, sqlContext: SQLContext) extends Serializable{
  val myFn = (l: Long) => true

  def myFilterer(rdd:RDD[Row]): RDD[Row] = {
    rdd.filter{ row => myFn(row.getAs[Long](0)) }
  }
}

class Working3 (sc: SparkContext, sqlContext: SQLContext) {
  def myFilterer(rdd:RDD[Row]): RDD[Row] = {
    val myFn = (l: Long) => true
    rdd.filter{ row => myFn(row.getAs[Long](0)) }
  }
}

val myFnGlobal = (l: Long) => true
val r1 = sc.parallelize(List(1L,2L,3L,4L,5L,6L,7L)).map(x => Row(x))

val w1 = new Working1(sc, sqlContext, myFnGlobal)
val w2 = new Working2(sc, sqlContext)
val w3 = new Working3(sc, sqlContext)
w1.myFilterer(r1).collect
w2.myFilterer(r1).collect
w3.myFilterer(r1).collect

Upvotes: 1

WestCoastProjects
WestCoastProjects

Reputation: 63022

The answer from @JustinPihony is the right one: Jupyter will create a class on the fly containing the code you have typed into its session and then submit it to spark on your behalf. The fn you created requires including that enclosing class.

You may need to jar up your custom logic into a user-defined jar file and include it on the jupyter classpath. The procedure to add to the classpath is going to depend on which jupyter kernel you are using.

Upvotes: 0

Related Questions