hawarden_
hawarden_

Reputation: 2170

Spark: choose default value for MergeSchema fields

I have a parquet that has an old schema like this :

| name | gender | age |
| Tom  | Male   | 30  |

And as our schema got updated to :

| name | gender | age | office |

we used mergeSchema when reading from the old parquet :

val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

But when reading from these old parquet files, I got the following output :

| name | gender | age | office |
| Tom  | Male   | 30  | null   |

which is normal. But I would like to take a default value for office (e.g. "California"), if and only if the field is not present in old schema. Is it possible ?

Upvotes: 0

Views: 2566

Answers (1)

Vincent Doba
Vincent Doba

Reputation: 5078

You don't have any simple method to put a default value when column doesn't exist in some parquet files but exists in other parquet files

In Parquet file format, each parquet file contains the schema definition. By default, when reading parquet, Spark get the schema from parquet file. The only effect of mergeSchema option is that instead of retrieving schema from one random parquet file, with mergeSchema Spark will read all schema of all parquet files and merge them.

So you can't put a default value without modifying the parquet files.

The other possible method is to provide your own schema when reading parquets by setting the option .schema() like that:

spark.read.schema(StructType(Array(FieldType("name", StringType), ...)).parquet(...)

But in this case, there is no option to set a default value.

So the only remaining solution is to add column default value manually

If we have two parquets, first one containing the data with the old schema:

+----+------+---+
|name|gender|age|
+----+------+---+
|Tom |Male  |30 |
+----+------+---+

and second one containing the data with the new schema:

+-----+------+---+------+
|name |gender|age|office|
+-----+------+---+------+
|Jane |Female|45 |Idaho |
|Roger|Male  |22 |null  |
+-----+------+---+------+

If you don't care to replace all the null value in "office" column, you can use .na.fill as follow:

spark.read.option("mergeSchema", "true").parquet(path).na.fill("California", Array("office"))

And you get the following result:

+-----+------+---+----------+
|name |gender|age|office    |
+-----+------+---+----------+
|Jane |Female|45 |Idaho     |
|Roger|Male  |22 |California|
|Tom  |Male  |30 |California|
+-----+------+---+----------+

If you want that only the old data get the default value, you have to read each parquet file to a dataframe, add the column with default value if necessary, and union all the resulting dataframes:

import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.util.CaseInsensitiveStringMap

ParquetTable("my_table",
  sparkSession = spark,
  options = CaseInsensitiveStringMap.empty(),
  paths = Seq(path),
  userSpecifiedSchema = None,
  fallbackFileFormat = classOf[ParquetFileFormat]
).fileIndex.allFiles().map(file => {
  val dataframe = spark.read.parquet(file.getPath.toString)

  if (dataframe.columns.contains("office")) {
    dataframe
  } else {
    dataframe.withColumn("office", lit("California"))
  }
}).reduce(_ unionByName _)

And you get the following result:

+-----+------+---+----------+
|name |gender|age|office    |
+-----+------+---+----------+
|Jane |Female|45 |Idaho     |
|Roger|Male  |22 |null      |
|Tom  |Male  |30 |California|
+-----+------+---+----------+

Note that all the part with ParquetTable([...].allFiles() is to retrieve the list of parquet files. It can be simplified if you are on hadoop or on local file system.

Upvotes: 1

Related Questions