Leet-Falcon
Leet-Falcon

Reputation: 2147

How to load mixed Parquet schema into DataFrame using Apache Spark?

I have a Spark job continuously uploading Parquet files to S3 (with partitions).
The files all have the same parquet schema.

One of the field types had been recently changed (from String to long) and so the parquet schema for some of the partitions is mixed.

Places having mixed data from both types now fail to read some of the content.
While it seems I can execute: sqlContext.read.load(path)
when trying to apply any fetch operation on the DataFrame (collect for example), the operation fails with ParquetDecodingException

I intend to migrate the data and re-format it but fail to read the mixed content into a DataFrame.
How can I load the mixed partitions using Apache Spark into DataFrames or any other Spark construct?

Following is the ParquetDecodingException trace:

scala> df.collect
[Stage 1:==============>        (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 1 in block 0 in file 
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)

Upvotes: 0

Views: 4048

Answers (2)

Ehud Lev
Ehud Lev

Reputation: 2901

As far as I know you can not mix 2 schema that has the same field with different type. Therefore the only solution I can think of is to:

  1. List files of partition

  2. Re-write each file to a new location and transform the data to the right schame

  3. If the original data was partitioned, another pass is required as to restore partitioning.
    This is because re-writing data file-by-file overrides the partitioning.
  4. Check that you can read all of the new partition as the right schema.
  5. Remove the "bad" partition and copy the tmp partition instead

Upvotes: 3

Denis Makarenko
Denis Makarenko

Reputation: 2938

There is another idea: instead of changing the type of the existing field (field_string), add a new field of the long type (field_long) and update the code that reads the data to something like this (in pseudocode) and enable schema merging. I believe it it enabled by default but this is a good case to be explicit about it:

sqlContext.read.option("mergeSchema", "true").parquet(<parquet_file>)

...

if isNull(field_long) 
  field_value_long = field_string.value.to_long
else
  field_value_long = field_long.value

Upvotes: 0

Related Questions