Reputation: 47
Enviornment - spark-3.0.1-bin-hadoop2.7, ScalaLibraryContainer 2.12.3, Scala, SparkSQL, eclipse-jee-oxygen-2-linux-gtk-x86_64
I have a csv file having 3 columns with data-type :String,Long,Date. I have converted csv file to datafram and want to show it. But it is giving following error
java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.examples.sql.SparkSQLExample5$.$anonfun$runInferSchemaExample$2(SparkSQLExample5.scala:30)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at scala code
.map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();
Error comes, if subsequent rows have less values than number of values present in header. Basically I am trying to read data from csv using Scala and Spark with columns have null values.
Rows dont have the same number of columns. It is running successfully if all the rows have 3 column values.
package org.apache.spark.examples.sql
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.sql.Date
import org.apache.spark.sql.functions._
import java.util.Calendar;
object SparkSQLExample5 {
case class Person(name: String, age: String, birthDate: String)
def main(args: Array[String]): Unit = {
val fromDateTime=java.time.LocalDateTime.now;
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
import spark.implicits._
runInferSchemaExample(spark);
spark.stop()
}
private def runInferSchemaExample(spark: SparkSession): Unit = {
import spark.implicits._
println("1. Creating an RDD of 'Person' object and converting into 'Dataframe' "+
" 2. Registering the DataFrame as a temporary view.")
println("1. Third column of second row is not present.Last value of second row is comma.")
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/test.csv")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();
val finalOutput=peopleDF.select("name","age","birthDate")
finalOutput.show();
}
}
csv file
col1,col2,col3
row21,row22,
row31,row32,
Upvotes: 2
Views: 1137
Reputation: 2345
Input: csv file
col1,col2,col3
row21,row22,
row31,row32,
Code:
import org.apache.spark.sql.SparkSession
object ReadCsvFile {
case class Person(name: String, age: String, birthDate: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
readCsvFileAndInferCustomSchema(spark);
spark.stop()
}
private def readCsvFileAndInferCustomSchema(spark: SparkSession): Unit = {
val df = spark.read.csv("C:/Users/Ralimili/Desktop/data.csv")
val rdd = df.rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
val mapRdd = rdd.map(attributes => {
Person(attributes.getString(0), attributes.getString(1),attributes.getString(2))
})
val finalDf = spark.createDataFrame(mapRdd)
finalDf.show(false);
}
}
output
+-----+-----+---------+
|name |age |birthDate|
+-----+-----+---------+
|row21|row22|null |
|row31|row32|null |
+-----+-----+---------+
If you want to fill some values instead of null values use below code
val customizedNullDf = finalDf.na.fill("No data")
customizedNullDf.show(false);
output
+-----+-----+---------+
|name |age |birthDate|
+-----+-----+---------+
|row21|row22|No data |
|row31|row32|No data |
+-----+-----+---------+
Upvotes: 0
Reputation: 175
Try PERMISSIVE mode when reading csv file, it will add NULL for missing fields
val df = spark.sqlContext.read.format("csv").option("mode", "PERMISSIVE") .load("examples/src/main/resources/test.csv")
you can find more information https://docs.databricks.com/data/data-sources/read-csv.html
Upvotes: 1