Reputation: 11835
I'm importing a collection from MongodB to Spark. All the documents have field 'data' which in turn is a structure and has field 'configurationName' (which is always null).
val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()
For the data
column in the resulting DataFrame
, I get this type:
StructType(StructField(configurationName,NullType,true), ...
When I try to save the dataframe as Parquet
partitionDF.write.mode("overwrite").parquet(collectionName + ".parquet")
I get the following error:
AnalysisException: Parquet data source does not support struct<configurationName:null, ...
It looks like the problem is that I have that NullType
buried in the data
column's type. I'm looking at How to handle null values when writing to parquet from Spark , but it only shows how to solve this NullType
problem on the top-level columns.
But how do you solve this problem when a NullType
is not at the top level? The only idea I have so far is to flatten the dataframe completely (exploding arrays and so on) and then all the NullType
s would pop at the top. But in such a case I would lose the original structure of the data (which I don't want to lose).
Is there a better solution?
Upvotes: 1
Views: 2579
Reputation: 11835
Building on How to handle null values when writing to parquet from Spark and How to pass schema to create a new Dataframe from existing Dataframe? (the second is suggested by @pasha701, thanks!), I constructed this:
def denullifyStruct(struct: StructType): StructType = {
val items = struct.map{ field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) }
StructType(items)
}
def denullify(dt: DataType): DataType = {
dt match {
case struct: StructType => denullifyStruct(struct)
case array: ArrayType => ArrayType(denullify(array.elementType), array.containsNull)
case _: NullType => StringType
case _ => dt
}
}
which effectively replaces all NullType
instances with StringType
ones.
And then
val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))
fixedDF.printSchema
Upvotes: 1
Reputation: 5487
@Roman Puchkovskiy : Rewritten your function using pattern matching.
def deNullifyStruct(struct: StructType): StructType = {
val items = struct.map { field => StructField(field.name, fixNullType(field.dataType), field.nullable, field.metadata) }
StructType(items)
}
def fixNullType(dt: DataType): DataType = {
dt match {
case _: StructType => return deNullifyStruct(dt.asInstanceOf[StructType])
case _: ArrayType =>
val array = dt.asInstanceOf[ArrayType]
return ArrayType(fixNullType(array.elementType), array.containsNull)
case _: NullType => return StringType
case _ => return dt
}
}
Upvotes: 1