Darshan
Darshan

Reputation: 81

org.apache.spark.SparkException: Task not serializable in scala

I am new to Scala. I am executing below code on Spark:

scala>   for(line <- sc.textFile("hdfs://ip:8020/property.conf")) 
         {
            val c = line.split("=")
            SparkConf.set(c(0), c(1)) 
            //println(c(0)+" "+c(1))   //Commented
         }

If I remove commented part and comment SparkConf.set(c(0), c(1)) then it works fine and display data.

But In this case I want to set parameters to SparkConf at runtime. But it throws me error that

org.apache.spark.SparkException: Task not serializable

Please suggest me Something.

Upvotes: 0

Views: 435

Answers (1)

sarveshseri
sarveshseri

Reputation: 13985

One thing that is very important to understand about spark is that it is a distributed environment.

The name RDD is a short-form for Resilient Distributed Datasets. The items in a spark RDD are generally divided into partitions which are distributed on various different nodes in Spark cluster.

When you call something like yourRdd.map(a => a.toString), the map implementation of this RDD knows that it has to first wrap this a => a.toString function in a closure, then serialize that closure and then send it to all the nodes which have partitions of this RDD. The actual computation of result takes place on those nodes.

So... when you are dealing with RDD's make sure that you don't confuse/mix distribution aware RDD api with normal Scala api.

The recommended way to write you piece of code will be,

val yourRdd = sc.textFile("hdfs://ip:8020/property.conf")) 

yourRdd.foreach(line =>
  val c = line.split("=")
  println(c(0) + " " + c(1))
)

Here in your SparkConf.set(c(0), c(1)) line, SparkConf is a class and you normally can not serialize classes. Neither can you call member function set on class SparkConf. You need to create instances of classes. Also SparkConf happens to be a class which does not implement the serializable interface and hence even the instances of SparkConf are not serializable.

Normally you should not be using a spark RDD to create your SparkConf as RDD will not exist without a SparkContext which in turn needs a SparkConf to be initialized.

But for this case lets say you needed to do just that... then you first have a get a normal scala list from your RDD then use that to create your SparkConf.

val mySparkConf = new SparkConf()

val yourRdd = sc.textFile("hdfs://ip:8020/property.conf")) 

val yourList = yourRdd.foreach(line =>
  val c = line.split("=")
).collect.toList

yourList.foreach(c => mySparkConf.set(c(0), c(1)))

Upvotes: 2

Related Questions