Ashhar Hasan
Ashhar Hasan

Reputation: 912

Spark unable to read DECIMAL columns in Parquet files written by AvroParquetWriter

I have some Parquet files written using AvroParquetWriter (from Kafka Connect S3 connector).

One of the columns in the file aseg_lat has a schema DECIMAL(9, 7).

I can read that column perfectly fine using both PyArrow and PrestoSQL.

Trying to read it via Spark 3.0.0 running on AWS EMR I get the following error:

scala> var df2 = df.select("aseg_lat")
df2: org.apache.spark.sql.DataFrame = [aseg_lat: decimal(9,7)]

scala> df2.show()
20/08/25 12:03:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
20/08/25 12:04:35 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 448, ip-172-30-2-50.ec2.internal, executor 8): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file <redacted>. Column: [aseg_lat], Expected: decimal(9,7), Found: BINARY
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:213)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:559)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:298)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:603)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:268)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:285)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:183)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:207)
    ... 20 more

20/08/25 12:04:38 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 451, ip-172-30-2-50.ec2.internal, executor 5): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file <redacted>. Column: [aseg_lat], Expected: decimal(9,7), Found: BINARY
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:213)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:559)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:298)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:603)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:268)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:285)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:183)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:122)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:207)
    ... 20 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2124)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2123)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:990)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3664)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3655)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:106)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:207)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:88)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3653)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2944)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:864)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:823)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:832)
  ... 47 elided
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file <redacted>. Column: [aseg_lat], Expected: decimal(9,7), Found: BINARY
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:213)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:122)
  at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:559)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:298)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:603)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:268)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:285)
  at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:183)
  at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:122)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:207)
  ... 20 more

I also tried using the Hive SerDe by setting spark.sql.hive.convertMetastoreParquet to false. That allows me to read the DECIMAL column but starts failing for other columns like Timestamps.

20/08/25 12:28:34 WARN DAGScheduler: Broadcasting large task binary with size 8.7 MiB
20/08/25 12:28:37 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 7, ip-172-30-2-50.ec2.internal, executor 6): java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
    at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14(TableReader.scala:468)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14$adapted(TableReader.scala:467)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$18(TableReader.scala:493)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

20/08/25 12:28:39 ERROR TaskSetManager: Task 0 in stage 4.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 10, ip-172-30-2-50.ec2.internal, executor 6): java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
    at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14(TableReader.scala:468)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14$adapted(TableReader.scala:467)
    at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$18(TableReader.scala:493)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2124)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2123)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:990)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3664)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3655)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:106)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:207)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:88)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3653)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2737)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2944)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:864)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:823)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:832)
  ... 47 elided
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable
  at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector.getPrimitiveJavaObject(WritableTimestampObjectInspector.java:39)
  at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14(TableReader.scala:468)
  at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$14$adapted(TableReader.scala:467)
  at org.apache.spark.sql.hive.HadoopTableReader$.$anonfun$fillObject$18(TableReader.scala:493)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Another observation was that changing the DECIMAL(9, 7) to DECIMAL(x, 7) (where x > 19) allows Spark to read the column but that is not a workable solution for me since I have multiple TBs of historical data written with DECIMAL(9, 7) which I'll need to re-process.

How can I read DECIMAL written by AvroParquetWriter from Spark?

Upvotes: 3

Views: 8181

Answers (1)

Ashhar Hasan
Ashhar Hasan

Reputation: 912

Disabling Spark's vectorized parquet reader allows Spark to read such columns without issue. This was verified on both Spark 3.0.0 and Spark 2.4.4.

i.e. Set spark.sql.parquet.enableVectorizedReader to false in either the SparkSession or in spark-defaults.

Thanks to @mazaneicha for suggesting to try out this option.

Upvotes: 6

Related Questions