Tarun Khaneja
Tarun Khaneja

Reputation: 451

org.apache.spark.SparkException: Task not serializable -- Scala

I am reading a text file and it is fixed width file which I need to convert to csv. My program works fine in local machine but when I run it on cluster, it throws "Task not serializable" exception.

I tried to solve same problem with map and mapPartition.

It works fine by using toLocalIterator on RDD. But it doesm't work with large file(I have files of 8GB)

Below is code by using mapPartition which I recently tried

//reading source file and creating RDD

def main(){
    var inpData = sc.textFile(s3File)
    LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")

    val rowRDD = inpData.mapPartitions(iter=>{
    var listOfRow = new ListBuffer[Row]
    while(iter.hasNext){
       var line = iter.next()
       if(line.length() >= maxIndex){
          listOfRow += getRow(line,indexList)
        }else{
          counter+=1
        }
     }
    listOfRow.toIterator
    })

    rowRDD .foreach(println)
}

case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable

def getRow(x: String, inst: List[StartEnd]): Row = {
    val columnArray = new Array[String](inst.size)
    for (f <- 0 to inst.size - 1) {
      columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
    }
    Row.fromSeq(columnArray)
}

//Note : for your refernce, indexList I have created using StartEnd case class, which looks like below after creation

[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]

This program works fine in my local machine. But when I put on cluster(AWS) it throws exception as shown below.

>>>>>>>>Map(ResultantDF -> [], ExceptionString -> 
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
[Driver] TRACE reflection.ReflectionInvoker$.invokeDTMethod - Exit

I am not able to understand what's wrong here and what is not serializable, why it is throwing exception.

Any help is appreciated. Thanks in advance!

Upvotes: 0

Views: 4354

Answers (1)

simpadjo
simpadjo

Reputation: 4017

You call getRow method inside a Spark mapPartition transformation. It forces spark to pass an instance of you main class to workers. The main class contains LOG as a field. Seems that this log is not serialization-friendly. You can

a) move getRow of LOG to a different object (general way to solve such issues)

b) make LOG a lazy val

c) use another logging library

Upvotes: 3

Related Questions