Roman Puchkovskiy
Roman Puchkovskiy

Reputation: 11835

Writing null values to Parquet in Spark when the NullType is inside a StructType

I'm importing a collection from MongodB to Spark. All the documents have field 'data' which in turn is a structure and has field 'configurationName' (which is always null).

val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()

For the data column in the resulting DataFrame, I get this type:

StructType(StructField(configurationName,NullType,true), ...

When I try to save the dataframe as Parquet

partitionDF.write.mode("overwrite").parquet(collectionName + ".parquet")

I get the following error:

AnalysisException: Parquet data source does not support struct<configurationName:null, ...

It looks like the problem is that I have that NullType buried in the data column's type. I'm looking at How to handle null values when writing to parquet from Spark , but it only shows how to solve this NullType problem on the top-level columns.

But how do you solve this problem when a NullType is not at the top level? The only idea I have so far is to flatten the dataframe completely (exploding arrays and so on) and then all the NullTypes would pop at the top. But in such a case I would lose the original structure of the data (which I don't want to lose).

Is there a better solution?

Upvotes: 1

Views: 2579

Answers (2)

Roman Puchkovskiy
Roman Puchkovskiy

Reputation: 11835

Building on How to handle null values when writing to parquet from Spark and How to pass schema to create a new Dataframe from existing Dataframe? (the second is suggested by @pasha701, thanks!), I constructed this:

def denullifyStruct(struct: StructType): StructType = {
  val items = struct.map{ field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) }
  StructType(items)
}

def denullify(dt: DataType): DataType = {
  dt match {
    case struct: StructType => denullifyStruct(struct)
    case array: ArrayType => ArrayType(denullify(array.elementType), array.containsNull)
    case _: NullType => StringType
    case _ => dt
  }
}

which effectively replaces all NullType instances with StringType ones.

And then

val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))
fixedDF.printSchema

Upvotes: 1

Mohana B C
Mohana B C

Reputation: 5487

@Roman Puchkovskiy : Rewritten your function using pattern matching.

  def deNullifyStruct(struct: StructType): StructType = {
    val items = struct.map { field => StructField(field.name, fixNullType(field.dataType), field.nullable, field.metadata) }
    StructType(items)
  }
  def fixNullType(dt: DataType): DataType = {
    dt match {
      case _: StructType => return deNullifyStruct(dt.asInstanceOf[StructType])
      case _: ArrayType =>
        val array = dt.asInstanceOf[ArrayType]
        return ArrayType(fixNullType(array.elementType), array.containsNull)
      case _: NullType => return StringType
      case _ => return dt
    }
  }

Upvotes: 1

Related Questions