Mor Eru
Mor Eru

Reputation: 1129

Spark & Scala - NullPointerException in RDD traversal

I have a number of CSV files and need to combine them into a RDD by part of their filenames.

For example, for the below files

$ ls   
20140101_1.csv  20140101_3.csv  20140201_2.csv  20140301_1.csv 
20140301_3.csv 20140101_2.csv  20140201_1.csv  20140201_3.csv 

I need to combine files with names 20140101*.csv into a RDD to work on it and so on.

I am using sc.wholeTextFiles to read the entire directory and then grouping the filenames by their patters to form a string of filenames. I am then passing the string to sc.textFile to open the files as a single RDD.

This is the code I have -

val files = sc.wholeTextFiles("*.csv")
val indexed_files = files.map(a => (a._1.split("_")(0),a._1))
val data = indexed_files.groupByKey

data.map { a =>
  var name = a._2.mkString(",")
  (a._1, name)
}

data.foreach { a =>
  var file = sc.textFile(a._2)
  println(file.count)
}

And I get SparkException - NullPointerException when I try to call textFile. The error stack refers to an Iterator inside the RDD. I am not able to understand the error -

15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 (TID 115, 10.132.8.10): java.lang.NullPointerException
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)

However, when I do sc.textFile(data.first._2).count in the spark shell, I am able to form the RDD and able to retrieve the count.

Any help is greatly appreciated.

Upvotes: 0

Views: 661

Answers (1)

The Archetypal Paul
The Archetypal Paul

Reputation: 41749

Converting a comment to an answer:

var file = sc.textFile(a._2)

inside the foreach of another RDD isn't going to work. You can't nest RDDs like that.

Upvotes: 2

Related Questions