Reputation: 417
I'm new to Scala and Spark. Wrote a simple test class and stuck on this issue for the whole day. Please find the below code
A.scala
class A(key :String) extends Serializable {
val this.key:String=key
def getKey(): String = { return this.key}
}
B.Scala
class B(key :String) extends Serializable {
val this.key:String=key
def getKey(): String = { return this.key}
}
Test.scala
import com.holdenkarau.spark.testing.{RDDComparisons, SharedSparkContext}
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
class Test extends FunSuite with SharedSparkContext with RDDComparisons with BeforeAndAfter with Serializable {
//comment this
private[this] val b1 = new B("test1")
test("Test RDD") {
val a1 = new A("test1")
val a2 = new A("test2")
val expected= sc.parallelize(Seq(a1,a2))
println(b1.getKey())
//val b1 = new B("test1")
//val key1 :String = b1.getKey()
expected.foreach{ a =>
//if(a.getKey().equalsIgnoreCase(key1))
if(a.getKey().equalsIgnoreCase(b1.getKey()))
print("hi")
}
}
}
This code is throwing exception
Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at com.adgear.adata.hhid.Test$$anonfun$1.apply$mcV$sp(Test.scala:19)
at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
at com.adgear.adata.hhid.Test$$anonfun$1.apply(Test.scala:11)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at com.adgear.adata.hhid.Test.org$scalatest$BeforeAndAfterAll$$super$run(Test.scala:7)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.adgear.adata.hhid.Test.run(Test.scala:7)
When I comment out the class level declaration of b1 and use the declaration inside the test methods itself then "if(a.getKey().equalsIgnoreCase(b1.getKey()))
" this works. And if I retain class level b1 definition then "if(a.getKey().equalsIgnoreCase(b1.getKey()))
" throws above exception. To solve this, I have to use "//val key1 :String = b1.getKey()" and "//if(a.getKey().equalsIgnoreCase(key1))
" then it works.
As one can see A, B, and Test all implements Serializable still I get this exception. What is causing this issue?
Thanks
Upvotes: 1
Views: 2470
Reputation: 557
Declaring a class as Serializable doesn't mean that it can be serialized unless all of its field are Serializable as well.
Since your Test class extends Funsuite, it will have an "assertionsHelper" field which is not Serializable. So when you reference the "b1" field in your "forEach" method, Spark will try to serialize the Test instance along with all its field (including the assertionsHelper).
If you want to avoid this, you'll have to either define b1 somwhere else (in the test method scope or a companion object), or dereference b1 into a new variable before including it in the forEach function:
val b1_ref = b1
expected.foreach { a =>
if (a.getKey().equalsIgnoreCase(b1_ref.getKey()))
print("hi")
}
PS: When you encounter a serialization exception you usually have access to the "serialization stack" in the logs which tell you exactly which object caused the error
Upvotes: 1