akm
akm

Reputation: 123

Spark parquet schema evolution

I have a partitioned hdfs parquet location which is having different schema is different partition.

Say 5 columns in first partition, 4 cols in 2nd partition. Now I try to read the base Parquet path and then filter the 2nd partition.

This gives me 5 columns in the DF even though I have only 4 columns in Parquet files in 2nd partition. When I read the 2nd partition directly, it gives correct 4 cols. How to fix this.

Upvotes: 3

Views: 3004

Answers (3)

Julien Kronegg
Julien Kronegg

Reputation: 5271

When you read Parquet files, the mergeSchema option has two different behaviors:

  • .option("mergeSchema", false) (default): reads the data from all versions using the latest schema
  • .option("mergeSchema", true): reads the data from all versions using an unified schema (which typically contains all columns of all versions)

In your example, assuming your schema version 1 has 5 columns a, b, c, d, e and your schema version 2 has 4 columns a, b, c, d (i.e. e was removed), then will get:

  • .option("mergeSchema", false) (default): reads the all the data for columns a, b, c, d
  • .option("mergeSchema", true): reads the all the data for columns a, b, c, d, e

Upvotes: 0

notNull
notNull

Reputation: 31540

You can specify the required schema(4 columns) while reading the parquet file!

  • Then spark only reads the fields that included in the schema, if field not exists in the data then null will be returned.

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val sch=new StructType().add("i",IntegerType).add("z",StringType)
spark.read.schema(sch).parquet("<parquet_file_path>").show()

//here i have i in my data and not have z field
//+---+----+
//|  i|   z|
//+---+----+
//|  1|null|
//+---+----+

Upvotes: 4

maxgruber19
maxgruber19

Reputation: 163

I would really like to hep you but I am not sure what you actually want to achieve. What's your intention about this?

If you to read the parquet file with all it's partitions and you just wanna get the columns both partitions have, maybe the read option "mergeSchema" fits your need.

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or setting the global SQL option spark.sql.parquet.mergeSchema to true.

refer to spark documentation

so it would be interesting which version of spark you are using and how the properties spark.sql.parquet.mergeSchema (spark setting) and mergeSchema (client) are set

Upvotes: 1

Related Questions