Reputation: 81
I am new to Scala. I am executing below code on Spark:
scala> for(line <- sc.textFile("hdfs://ip:8020/property.conf"))
{
val c = line.split("=")
SparkConf.set(c(0), c(1))
//println(c(0)+" "+c(1)) //Commented
}
If I remove commented part and comment SparkConf.set(c(0), c(1))
then it works fine and display data.
But In this case I want to set parameters to SparkConf at runtime. But it throws me error that
org.apache.spark.SparkException: Task not serializable
Please suggest me Something.
Upvotes: 0
Views: 435
Reputation: 13985
One thing that is very important to understand about spark is that it is a distributed environment.
The name RDD
is a short-form for Resilient Distributed Datasets
. The items in a spark RDD are generally divided into partitions
which are distributed on various different nodes in Spark cluster.
When you call something like yourRdd.map(a => a.toString)
, the map
implementation of this RDD knows that it has to first wrap this a => a.toString
function in a closure, then serialize that closure and then send it to all the nodes which have partitions
of this RDD
. The actual computation of result takes place on those nodes.
So... when you are dealing with RDD
's make sure that you don't confuse/mix distribution aware RDD
api with normal Scala api.
The recommended way to write you piece of code will be,
val yourRdd = sc.textFile("hdfs://ip:8020/property.conf"))
yourRdd.foreach(line =>
val c = line.split("=")
println(c(0) + " " + c(1))
)
Here in your SparkConf.set(c(0), c(1))
line, SparkConf
is a class
and you normally can not serialize classes
. Neither can you call member function set
on class SparkConf
. You need to create instances of classes
. Also SparkConf
happens to be a class which does not implement the serializable interface and hence even the instances of SparkConf
are not serializable.
Normally you should not be using a spark RDD to create your SparkConf
as RDD will not exist without a SparkContext
which in turn needs a SparkConf
to be initialized.
But for this case lets say you needed to do just that... then you first have a get a normal scala list from your RDD then use that to create your SparkConf.
val mySparkConf = new SparkConf()
val yourRdd = sc.textFile("hdfs://ip:8020/property.conf"))
val yourList = yourRdd.foreach(line =>
val c = line.split("=")
).collect.toList
yourList.foreach(c => mySparkConf.set(c(0), c(1)))
Upvotes: 2