Reputation:
I'm getting Task not serializable
error from the following code.
var inputRDD = sc.textFile(inputPath).flatMap(line => line.split(":")).map{word => (word)}
var i = 1
var characters:Array[String] = new Array[String](3)
characters = Array("a","i","m")
for (i <- 1 to 4){
inputRDD.foreach { word =>
sc.broadcast(replacement)
val result = word.replaceAll(replacement, "b")
println(word,result)
replacement = characters(i)
}
}
I couldn't find any help online. I need some help here. Thanks
Upvotes: 0
Views: 654
Reputation: 27470
@maasg has already answered your question. But the code is so wrong, I think you need some help to get it working. I don't know if you've read through http://spark.apache.org/docs/latest/programming-guide.html.
Your mistakes:
map{word => (word)}
in the first line does nothing. It just maps each word to itself.i
on the second line is unused. The for
loop declares a new variable. You don't need to pre-declare the loop variable.characters
is not used and immediately replaced. Why not start with the final value? Then all var
s could be val
s. val
s are easier to work with because they don't change values.foreach
you try to create a broadcast variable for replacement
which is undeclared. The sc.broadcast
has to be called in code that runs on the driver (the body of foreach
will run in the executors). sc.broadcast
returns a broadcast variable, which you can then use inside the code that runs on the executors. Instead, you just from the returned variable on the ground.broadcast
is only important for large values (like >10kB). Don't worry about it for now.characters
) from 1 to 4. The valid indexes are from 0 to 2.foreach
cannot be used to modify an RDD. RDDs are immutable. You need to use map
instead to create a new, modified RDD from the original RDD.My guess for what you're trying to do is to load a file, split on colons and replace three letters (a
, i
, m
) with b
. Here's how to do it:
def replaceLetters(filename: String,
patterns: Seq[String],
replacement: String): RDD[String] = {
val words = sc.textFile(filename).flatMap(_.split(":"))
words.map { word =>
patterns.foldLeft(word) {
(word, pattern) => word.replaceAll(pattern, replacement)
}
}
}
// Call our function, print the results.
replaceLetters("my_file", "a,i,m".split(","), "b").collect.foreach(println(_))
Upvotes: 0
Reputation: 37435
This code tries to use the SparkContext (sc
) within a closure on an RDD. The sparkContext
is not serializable and is not meant to be serialized in its usage.
It's unclear to me what this code is trying to achieve, but it will need to be changed to remove the use of sparkContext
from any closure.
Upvotes: 1