Paul
Paul

Reputation: 198

Spark Skip Bad Records while reading CSV

I want to read load data from a .csv File into a Spark Dataframe but I get an error message, most probably due to bad entries. Is there a possibility to skip bad lines programatically?

Here is my scala code.

val df = session.read
      .option("header", "true")
      .option("delimiter", delimiter)
      .option("inferSchema", "true")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .csv(csvFilePath)
    onData(df)

And here is my Error Log I got from Amazon EMR:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 61 in stage 1.0 failed 1 times, most recent failure: Lost task 61.0 in stage 1.0 (TID 62, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.org$apache$spark$sql$execution$datasources$csv$CSVInferSchema$$inferRowType(CSVInferSchema.scala:64)
    at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
    at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:142)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:199)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
    at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

(would it be more helpful to provide the whole stacktrace?)

Thanks a lot!

The data is from a reddit-dataset with comments. Here it is: https://www.kaggle.com/reddit/reddit-comments-may-2015 The data looks like that (sorry, it has 17 columns, I think this is the best way to show you those lines) https://i.sstatic.net/IOX4H.jpg

Upvotes: 1

Views: 7456

Answers (2)

data_addict
data_addict

Reputation: 894

When you have a glance at the source code InferSchema.scala at line number 64, it's trying to call length on next that's where you are getting Nullpointer exception.

https://github.com/databricks/spark-csv/blob/master/src/main/scala/com/databricks/spark/csv/util/InferSchema.scala

I suspect something issue with the data even after dropping the malformed records. Probably you can give a try by disabling the inferschema( or probably creating you own schema and pass to it)

val df = session.read
      .option("header", "true")
      .option("delimiter", delimiter)
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .option("mode", "DROPMALFORMED")
      .csv(csvFilePath) 

Just a thought after looking at the source code.

Upvotes: 1

dassum
dassum

Reputation: 5113

use option("mode", "DROPMALFORMED") to skip bad rows.

val df = session.read
          .option("header", "true")
          .option("delimiter", delimiter)
          .option("inferSchema", "true")
          .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
          .option("mode", "DROPMALFORMED")
          .csv(csvFilePath)
        onData(df)

Upvotes: 10

Related Questions