Reputation: 109
Learning Spark for java and trying to read in a .csv
file as a DataFrame
using the DataFrameReader
, can't even get a super simple .csv
file to work as I keep getting exception java.lang.NegativeArraySizeException
.
Here is what I am doing:
public void test() {
DataFrameReader dataFrameReader = new DataFrameReader(getSparkSession());
StructType parentSchema = new StructType(new StructField[] {
DataTypes.createStructField("NAME", DataTypes.StringType, false),
});
Dataset<Row> parents = dataFrameReader.schema(parentSchema).csv("/Users/mjsee/Downloads/test.csv");
parents.show();
}
and here is how I am setting up my spark session
sparkSession = SparkSession.builder()
.appName(getApplicationName())
.master("local[*]")
.config("spark.driver.host", "localhost")
.getOrCreate();
and here is my tst.csv
file:
"JESSE"
and here is my output
java.lang.NegativeArraySizeException
at com.univocity.parsers.common.input.DefaultCharAppender.<init>(DefaultCharAppender.java:39) ~[Univocity-Parsers-2.x.jar:?]
at com.univocity.parsers.csv.CsvParserSettings.newCharAppender(CsvParserSettings.java:82) ~[Univocity-Parsers-2.x.jar:?]
at com.univocity.parsers.common.ParserOutput.<init>(ParserOutput.java:93) ~[Univocity-Parsers-2.x.jar:?]
at com.univocity.parsers.common.AbstractParser.<init>(AbstractParser.java:74) ~[Univocity-Parsers-2.x.jar:?]
at com.univocity.parsers.csv.CsvParser.<init>(CsvParser.java:59) ~[Univocity-Parsers-2.x.jar:?]
at org.apache.spark.sql.execution.datasources.csv.CsvReader.<init>(CSVParser.scala:49) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:158) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:146) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) ~[Spark-sql.jar:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) ~[Spark-sql.jar:?]
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) ~[Spark-sql.jar:?]
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) ~[Spark-core.jar:?]
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) ~[Spark-core.jar:?]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) ~[Spark-core.jar:?]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) ~[Spark-core.jar:?]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) ~[Spark-core.jar:?]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) ~[Spark-core.jar:?]
at org.apache.spark.scheduler.Task.run(Task.scala:99) ~[Spark-core.jar:?]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) [Spark-core.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
15:45:29.544 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NegativeArraySizeException
at com.univocity.parsers.common.input.DefaultCharAppender.<init>(DefaultCharAppender.java:39)
at com.univocity.parsers.csv.CsvParserSettings.newCharAppender(CsvParserSettings.java:82)
at com.univocity.parsers.common.ParserOutput.<init>(ParserOutput.java:93)
at com.univocity.parsers.common.AbstractParser.<init>(AbstractParser.java:74)
at com.univocity.parsers.csv.CsvParser.<init>(CsvParser.java:59)
at org.apache.spark.sql.execution.datasources.csv.CsvReader.<init>(CSVParser.scala:49)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:158)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:146)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
at ModelProcessingTest.testSTUFF(ModelProcessingTest.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.mockito.internal.runners.JUnit45AndHigherRunnerImpl.run(Unknown Source)
at org.mockito.runners.MockitoJUnitRunner.run(Unknown Source)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NegativeArraySizeException
at com.univocity.parsers.common.input.DefaultCharAppender.<init>(DefaultCharAppender.java:39)
at com.univocity.parsers.csv.CsvParserSettings.newCharAppender(CsvParserSettings.java:82)
at com.univocity.parsers.common.ParserOutput.<init>(ParserOutput.java:93)
at com.univocity.parsers.common.AbstractParser.<init>(AbstractParser.java:74)
at com.univocity.parsers.csv.CsvParser.<init>(CsvParser.java:59)
at org.apache.spark.sql.execution.datasources.csv.CsvReader.<init>(CSVParser.scala:49)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:158)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:146)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:138)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:122)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:150)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 4
Views: 2262
Reputation: 6289
Author of the univocity-parsers library here. This is happening because internally spark is setting the maximum value length to -1
(meaning no limit). This was introduced in univocity-parsers versions 2.2.0 onward.
Just make sure this library version is greater than 2.2.0 and you should be fine, as the older versions don't support setting the maxCharsPerColumn
property to -1
.
If you have multiple versions of that library in your classpath, get rid of the older ones. Ideally you'd want to update to the latest version (currently 2.4.1.) and use only that. It should work just fine as we make sure any changes made to the library are backward compatible.
Upvotes: 4
Reputation: 4025
May be you have a comma after createStructField
in your parentschema that is causing the issue ?
StructType parentSchema = new StructType(new StructField[] {
DataTypes.createStructField("NAME", DataTypes.StringType, false)
});
Person ( csv under resources)
Jagan,Pantula,37,Singapore
Neeraja,Pantula,32,Singapore
Rama,Pantula,34,India
Rajya,Akundi,32,India
Viswanath,Pantula,42,India
Code
SparkSession session = getSession();
DataFrameReader reader = new DataFrameReader(session);
StructType parentSchema = new StructType(new StructField[] {
DataTypes.createStructField("FirstName", DataTypes.StringType, false),
DataTypes.createStructField("LastName", DataTypes.StringType, false),
DataTypes.createStructField("Age", DataTypes.IntegerType, false),
DataTypes.createStructField("Country", DataTypes.StringType, false)
});
String path = getClass().getClassLoader()
.getResource("Person")
.getPath()
.substring(1);
reader.schema(parentSchema).csv(path).show();
Output
+---------+--------+---+---------+
|FirstName|LastName|Age| Country|
+---------+--------+---+---------+
| Jagan| Pantula| 37|Singapore|
| Neeraja| Pantula| 32|Singapore|
| Rama| Pantula| 34| India|
| Rajya| Akundi| 32| India|
|Viswanath| Pantula| 42| India|
+---------+--------+---+---------+
Upvotes: 0