Reputation: 2147
I have a Spark job continuously uploading Parquet files to S3 (with partitions).
The files all have the same parquet schema.
One of the field types had been recently changed (from String to long) and so the parquet schema for some of the partitions is mixed.
Places having mixed data from both types now fail to read some of the content.
While it seems I can execute: sqlContext.read.load(path)
when trying to apply any fetch operation on the DataFrame (collect
for example), the operation fails with ParquetDecodingException
I intend to migrate the data and re-format it but fail to read the mixed content into a DataFrame.
How can I load the mixed partitions using Apache Spark into DataFrames or any other Spark construct?
Following is the ParquetDecodingException trace:
scala> df.collect
[Stage 1:==============> (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException:
Can not read value at 1 in block 0 in file
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
Upvotes: 0
Views: 4048
Reputation: 2901
As far as I know you can not mix 2 schema that has the same field with different type. Therefore the only solution I can think of is to:
Re-write each file to a new location and transform the data to the right schame
Upvotes: 3
Reputation: 2938
There is another idea: instead of changing the type of the existing field (field_string), add a new field of the long type (field_long) and update the code that reads the data to something like this (in pseudocode) and enable schema merging. I believe it it enabled by default but this is a good case to be explicit about it:
sqlContext.read.option("mergeSchema", "true").parquet(<parquet_file>)
...
if isNull(field_long)
field_value_long = field_string.value.to_long
else
field_value_long = field_long.value
Upvotes: 0