Reputation:
I have an RDD of Rows and I want to filter based on a closure. Ultimately I want to pass the closure in as a parameter to my method which is doing the filter, but I've simplified it and I can reproduce the error with something simple like this.
def fn(l: Long): Boolean = true
rdd.filter{ row => fn(row.getAs[Long]("field")) }
I tried putting fn into a case object, an object that extends a serializable trait, defining fn inside and outside of the method calling filter. I'm trying to figure out what I need to do without getting these errors. I know there are a lot of questions about this already on stack overflow and I've been looking for a suitable answer but I can't find it.
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.RDD.filter(RDD.scala:340)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
$line131.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
$line131.$read$$iwC$$iwC$$iwC.<init>(<console>:54)
$line131.$read$$iwC$$iwC.<init>(<console>:56)
$line131.$read$$iwC.<init>(<console>:58)
$line131.$read.<init>(<console>:60)
$line131.$read$.<init>(<console>:64)
$line131.$read$.<clinit>(<console>)
$line131.$eval$.<init>(<console>:7)
$line131.$eval$.<clinit>(<console>)
$line131.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:601)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:351)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:722)
UPDATE:
A more complete example. I am running Jupyter with Toree and executing code from a jar file in my cells. Here are three of the things I tried which fail
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
class NotWorking1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) {
def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](longField)) }
}
object NotWorking1 {
def apply(sc: SparkContext, sqlContext: SQLContext) = {
def myFn(l: Long): Boolean = true
new NotWorking1(sc, sqlContext, myFn)
}
}
class NotWorking2(sc: SparkContext, sqlContext: SQLContext) {
def myFn(l: Long): Boolean = true
def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
rdd.filter{ row => myFn(row.getAs[Long](longField)) }
}
}
object NotWorking2 {
def apply(sc: SparkContext, sqlContext: SQLContext) = {
new NotWorking2(sc, sqlContext)
}
}
class NotWorking3(sc: SparkContext, sqlContext: SQLContext) {
def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
def myFn(l: Long): Boolean = true
rdd.filter{ row => myFn(row.getAs[Long](longField)) }
}
}
object NotWorking3 {
def apply(sc: SparkContext, sqlContext: SQLContext) = {
new NotWorking3(sc, sqlContext)
}
}
from the Jupyter cell, I import the appropriate class and run
val nw1 = NotWorking1(sc, sqlContext)
val nw2 = NotWorking2(sc, sqlContext)
val nw3 = NotWorking3(sc, sqlContext)
nw1.myFilterer(rdd, "field")
nw2.myFilterer(rdd, "field")
nw3.myFilterer(rdd, "field")
All three fail. NotWorking3 is particularly surprising. It their anything I can do to isolate the function and not try to serialize the whole object (which I believe will get me into trouble anyway since I'm keeping a reference to the spark and sql contexts)
Upvotes: 0
Views: 1701
Reputation: 530
The easiest way in my experience has been to just use functions instead of methods if you want them to be serializable. In other words, if you want your pieces of code to be shipped to the executors, define them using val, not def.
In your example, in the class NotWorking3, change myFn as below and it will work:
val myFn = (l: Long) => true
Update:
For NotWorking1 and 2, along with using val instead of def, you also need to extend Serializable trait and use the @SerialVersionUID annotation. Here is the working version of your examples (with slight changes here and there):
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
@SerialVersionUID(100L)
class Working1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) extends Serializable{
def myFilterer(rdd:RDD[Row]): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](0)) }
}
@SerialVersionUID(101L)
class Working2 (sc: SparkContext, sqlContext: SQLContext) extends Serializable{
val myFn = (l: Long) => true
def myFilterer(rdd:RDD[Row]): RDD[Row] = {
rdd.filter{ row => myFn(row.getAs[Long](0)) }
}
}
class Working3 (sc: SparkContext, sqlContext: SQLContext) {
def myFilterer(rdd:RDD[Row]): RDD[Row] = {
val myFn = (l: Long) => true
rdd.filter{ row => myFn(row.getAs[Long](0)) }
}
}
val myFnGlobal = (l: Long) => true
val r1 = sc.parallelize(List(1L,2L,3L,4L,5L,6L,7L)).map(x => Row(x))
val w1 = new Working1(sc, sqlContext, myFnGlobal)
val w2 = new Working2(sc, sqlContext)
val w3 = new Working3(sc, sqlContext)
w1.myFilterer(r1).collect
w2.myFilterer(r1).collect
w3.myFilterer(r1).collect
Upvotes: 1
Reputation: 63022
The answer from @JustinPihony is the right one: Jupyter will create a class on the fly containing the code you have typed into its session and then submit it to spark on your behalf. The fn
you created requires including that enclosing class.
You may need to jar
up your custom logic into a user-defined jar file and include it on the jupyter classpath. The procedure to add to the classpath is going to depend on which jupyter kernel you are using.
Upvotes: 0