user2430771
user2430771

Reputation: 1422

object not serializable org.apache.spark.SparkContext

I was learning about broadcast variables in Spark so I tried to make use of it. I'm using spark-shell (Version 1.6.0). Following is my code:

scala> val pageurls = sc.parallelize(List(("www.google.com","Google"),("www.yahoo.com","Yahoo"))
pageurls: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27
  scala> val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10)))
  pageCounts: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:27
  scala> val pageMaps = pageurls.collectAsMap
  pageMaps: scala.collection.Map[String,String] = Map(www.yahoo.com -> Yahoo, www.google.com -> Google)
  scala> val bMaps = sc.broadcast(pageMaps)
  bMaps: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(2)
  scala> bMaps.value
  res0: scala.collection.Map[String,String] = Map(www.yahoo.com -> Yahoo, www.google.com -> Google)
  scala> val newRdd = pageCounts.map{
 | case (url,count) => (url,bMaps.value(url),count)}
  newRdd: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[2] at map at <console>:35
  scala> newRdd.collect
  res1: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10))

The code worked fine when I run spark-shell and use the default SparkContext sc which get created when spark-shell is invoked. However, I created my own SparkContext and tried to run the same sequence of code. Before creating my own context, I stop the default created SparkContext using sc.stop

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local").setAppName("MyApp")
val sc = new SparkContext(conf)

When I create SparkContext like this and use broadcasts variable, I get the following exception: org.apache.spark.SparkException: Task not serializable

Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf

Why does it happen like that and what shall I do so that I don't get these errors?Anything I'm missing?

Upvotes: 0

Views: 4271

Answers (1)

m-bhole
m-bhole

Reputation: 1189

When you start spark-shell, spark-shell creates sparkcontext [ sc ] for you. One jvm can only have one spark-shell. You are trying to create another spark-shell in same jvm. It seems that the version of spark you are on, sparkConf is throwing the exception of class that is not serializable. To avoid this exception use :

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf()
conf.setAppName("MyApp")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.setMaster("local")
val sc = new SparkContext(conf)

References : a] Multiple SparkContext detected in the same JVM

b] https://issues.apache.org/jira/browse/SPARK-2243

Edit Solution 1: Make function for broadcasting variable and call it from shell :

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local").setAppName("MyApp")
val sc = new SparkContext(conf)
val pageurls = sc.parallelize(List(("www.google.com","Google"),  ("www.yahoo.com","Yahoo")))
val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10)))
val pageMaps = pageurls.collectAsMap
object Test{
def bVar( sc:SparkContext, pageMaps: scala.collection.Map[String, String] ) = {
  val bMaps = sc.broadcast(pageMaps)
  bMaps.value
  val newRdd = pageCounts.map{case (url,count) => (url,bMaps.value(url),count)}
  newRdd.collect
}}
val result = Test.bVar(sc, pageMaps)
result: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10))

Reference : Spark Accumulator throws "Task not serializable" error

Solution 2 : If you insist not to use function from shell, make sparkcontext and sparkconf as transient.

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
@transient val conf = new SparkConf().setMaster("local").setAppName("MyApp")
@transient val sc = new SparkContext(conf)
val pageurls = sc.parallelize(List(("www.google.com","Google"), ("www.yahoo.com","Yahoo")))
val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10)))
val pageMaps = pageurls.collectAsMap
val bMaps = sc.broadcast(pageMaps)
bMaps.value
val newRdd = pageCounts.map{case (url,count) => (url,bMaps.value(url),count)}
newRdd.collect
res3: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10))

Reference : Should I leave the variable as transient?

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

Upvotes: 1

Related Questions