Reputation: 2170
I have a parquet that has an old schema like this :
| name | gender | age |
| Tom | Male | 30 |
And as our schema got updated to :
| name | gender | age | office |
we used mergeSchema when reading from the old parquet :
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
But when reading from these old parquet files, I got the following output :
| name | gender | age | office |
| Tom | Male | 30 | null |
which is normal. But I would like to take a default value for office
(e.g. "California"), if and only if the field is not present in old schema. Is it possible ?
Upvotes: 0
Views: 2566
Reputation: 5078
You don't have any simple method to put a default value when column doesn't exist in some parquet files but exists in other parquet files
In Parquet file format, each parquet file contains the schema definition. By default, when reading parquet, Spark get the schema from parquet file. The only effect of mergeSchema
option is that instead of retrieving schema from one random parquet file, with mergeSchema
Spark will read all schema of all parquet files and merge them.
So you can't put a default value without modifying the parquet files.
The other possible method is to provide your own schema when reading parquets by setting the option .schema()
like that:
spark.read.schema(StructType(Array(FieldType("name", StringType), ...)).parquet(...)
But in this case, there is no option to set a default value.
So the only remaining solution is to add column default value manually
If we have two parquets, first one containing the data with the old schema:
+----+------+---+
|name|gender|age|
+----+------+---+
|Tom |Male |30 |
+----+------+---+
and second one containing the data with the new schema:
+-----+------+---+------+
|name |gender|age|office|
+-----+------+---+------+
|Jane |Female|45 |Idaho |
|Roger|Male |22 |null |
+-----+------+---+------+
If you don't care to replace all the null
value in "office" column, you can use .na.fill
as follow:
spark.read.option("mergeSchema", "true").parquet(path).na.fill("California", Array("office"))
And you get the following result:
+-----+------+---+----------+
|name |gender|age|office |
+-----+------+---+----------+
|Jane |Female|45 |Idaho |
|Roger|Male |22 |California|
|Tom |Male |30 |California|
+-----+------+---+----------+
If you want that only the old data get the default value, you have to read each parquet file to a dataframe, add the column with default value if necessary, and union all the resulting dataframes:
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.util.CaseInsensitiveStringMap
ParquetTable("my_table",
sparkSession = spark,
options = CaseInsensitiveStringMap.empty(),
paths = Seq(path),
userSpecifiedSchema = None,
fallbackFileFormat = classOf[ParquetFileFormat]
).fileIndex.allFiles().map(file => {
val dataframe = spark.read.parquet(file.getPath.toString)
if (dataframe.columns.contains("office")) {
dataframe
} else {
dataframe.withColumn("office", lit("California"))
}
}).reduce(_ unionByName _)
And you get the following result:
+-----+------+---+----------+
|name |gender|age|office |
+-----+------+---+----------+
|Jane |Female|45 |Idaho |
|Roger|Male |22 |null |
|Tom |Male |30 |California|
+-----+------+---+----------+
Note that all the part with ParquetTable([...].allFiles()
is to retrieve the list of parquet files. It can be simplified if you are on hadoop or on local file system.
Upvotes: 1