Reputation: 451
I am reading a text file and it is fixed width file which I need to convert to csv. My program works fine in local machine but when I run it on cluster, it throws "Task not serializable" exception.
I tried to solve same problem with map and mapPartition.
It works fine by using toLocalIterator on RDD. But it doesm't work with large file(I have files of 8GB)
Below is code by using mapPartition which I recently tried
//reading source file and creating RDD
def main(){
var inpData = sc.textFile(s3File)
LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")
val rowRDD = inpData.mapPartitions(iter=>{
var listOfRow = new ListBuffer[Row]
while(iter.hasNext){
var line = iter.next()
if(line.length() >= maxIndex){
listOfRow += getRow(line,indexList)
}else{
counter+=1
}
}
listOfRow.toIterator
})
rowRDD .foreach(println)
}
case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable
def getRow(x: String, inst: List[StartEnd]): Row = {
val columnArray = new Array[String](inst.size)
for (f <- 0 to inst.size - 1) {
columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
}
Row.fromSeq(columnArray)
}
//Note : for your refernce, indexList I have created using StartEnd case class, which looks like below after creation
[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]
This program works fine in my local machine. But when I put on cluster(AWS) it throws exception as shown below.
>>>>>>>>Map(ResultantDF -> [], ExceptionString ->
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
[Driver] TRACE reflection.ReflectionInvoker$.invokeDTMethod - Exit
I am not able to understand what's wrong here and what is not serializable, why it is throwing exception.
Any help is appreciated. Thanks in advance!
Upvotes: 0
Views: 4354
Reputation: 4017
You call getRow
method inside a Spark mapPartition
transformation. It forces spark to pass an instance of you main class to workers. The main class contains LOG
as a field. Seems that this log is not serialization-friendly.
You can
a) move getRow
of LOG
to a different object
(general way to solve such issues)
b) make LOG a lazy val
c) use another logging library
Upvotes: 3