user835281
user835281

Reputation:

"Task not serializable" error in Spark code

I'm getting Task not serializable error from the following code.

   var inputRDD = sc.textFile(inputPath).flatMap(line => line.split(":")).map{word => (word)} 
   var i = 1
   var characters:Array[String] = new Array[String](3)
   characters = Array("a","i","m")    
   for (i <- 1 to 4){
     inputRDD.foreach { word =>
          sc.broadcast(replacement)
          val result = word.replaceAll(replacement, "b")
          println(word,result)
          replacement = characters(i)        
      }
     }

I couldn't find any help online. I need some help here. Thanks

Upvotes: 0

Views: 654

Answers (2)

Daniel Darabos
Daniel Darabos

Reputation: 27470

@maasg has already answered your question. But the code is so wrong, I think you need some help to get it working. I don't know if you've read through http://spark.apache.org/docs/latest/programming-guide.html.

Your mistakes:

  • map{word => (word)} in the first line does nothing. It just maps each word to itself.
  • The i on the second line is unused. The for loop declares a new variable. You don't need to pre-declare the loop variable.
  • The initial value of characters is not used and immediately replaced. Why not start with the final value? Then all vars could be vals. vals are easier to work with because they don't change values.
  • Inside the foreach you try to create a broadcast variable for replacement which is undeclared. The sc.broadcast has to be called in code that runs on the driver (the body of foreach will run in the executors). sc.broadcast returns a broadcast variable, which you can then use inside the code that runs on the executors. Instead, you just from the returned variable on the ground.
  • broadcast is only important for large values (like >10kB). Don't worry about it for now.
  • You try to index a 3-element array (characters) from 1 to 4. The valid indexes are from 0 to 2.
  • foreach cannot be used to modify an RDD. RDDs are immutable. You need to use map instead to create a new, modified RDD from the original RDD.

My guess for what you're trying to do is to load a file, split on colons and replace three letters (a, i, m) with b. Here's how to do it:

def replaceLetters(filename: String,
                   patterns: Seq[String],
                   replacement: String): RDD[String] = {
  val words = sc.textFile(filename).flatMap(_.split(":"))
  words.map { word =>
    patterns.foldLeft(word) {
      (word, pattern) => word.replaceAll(pattern, replacement)
    }
  }
}

// Call our function, print the results.
replaceLetters("my_file", "a,i,m".split(","), "b").collect.foreach(println(_))

Upvotes: 0

maasg
maasg

Reputation: 37435

This code tries to use the SparkContext (sc) within a closure on an RDD. The sparkContext is not serializable and is not meant to be serialized in its usage.

It's unclear to me what this code is trying to achieve, but it will need to be changed to remove the use of sparkContext from any closure.

Upvotes: 1

Related Questions