Reputation: 198
I want to read load data from a .csv File into a Spark Dataframe but I get an error message, most probably due to bad entries. Is there a possibility to skip bad lines programatically?
Here is my scala code.
val df = session.read
.option("header", "true")
.option("delimiter", delimiter)
.option("inferSchema", "true")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.csv(csvFilePath)
onData(df)
And here is my Error Log I got from Amazon EMR:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 61 in stage 1.0 failed 1 times, most recent failure: Lost task 61.0 in stage 1.0 (TID 62, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$.org$apache$spark$sql$execution$datasources$csv$CSVInferSchema$$inferRowType(CSVInferSchema.scala:64)
at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
at org.apache.spark.sql.execution.datasources.csv.CSVInferSchema$$anonfun$2.apply(CSVInferSchema.scala:44)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142)
at scala.collection.Iterator$class.foreach(Iterator.scala:750)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:142)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1202)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:199)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1202)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$22.apply(RDD.scala:1113)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
(would it be more helpful to provide the whole stacktrace?)
Thanks a lot!
The data is from a reddit-dataset with comments. Here it is: https://www.kaggle.com/reddit/reddit-comments-may-2015 The data looks like that (sorry, it has 17 columns, I think this is the best way to show you those lines) https://i.sstatic.net/IOX4H.jpg
Upvotes: 1
Views: 7456
Reputation: 894
When you have a glance at the source code InferSchema.scala
at line number 64, it's trying to call length on next that's where you are getting Nullpointer exception.
I suspect something issue with the data even after dropping the malformed records. Probably you can give a try by disabling the inferschema( or probably creating you own schema and pass to it)
val df = session.read
.option("header", "true")
.option("delimiter", delimiter)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("mode", "DROPMALFORMED")
.csv(csvFilePath)
Just a thought after looking at the source code.
Upvotes: 1
Reputation: 5113
use option("mode", "DROPMALFORMED") to skip bad rows.
val df = session.read
.option("header", "true")
.option("delimiter", delimiter)
.option("inferSchema", "true")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("mode", "DROPMALFORMED")
.csv(csvFilePath)
onData(df)
Upvotes: 10