Reputation: 11865
I'm importing a collection from MongodB to Spark.
val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()
For the data
column in the resulting DataFrame
, I get this type:
StructType(StructField(configurationName,NullType,true), ...
so at least some types in some columns are NullType
.
As per Writing null values to Parquet in Spark when the NullType is inside a StructType , I try fixing the schema by replacing all NullType
s with StringType
s:
def denullifyStruct(struct: StructType): StructType = {
val items = struct.map{ field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) }
StructType(items)
}
def denullify(dt: DataType): DataType = {
if (dt.isInstanceOf[StructType]) {
val struct = dt.asInstanceOf[StructType]
return denullifyStruct(struct)
} else if (dt.isInstanceOf[ArrayType]) {
val array = dt.asInstanceOf[ArrayType]
return ArrayType(denullify(array.elementType), array.containsNull)
} else if (dt.isInstanceOf[NullType]) {
return StringType
}
return dt
}
val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))
Issuing fixedDF.printSchema
I can see that no NullType
exists in the fixedDF
's schema anymore. But when I try to save it to Parquet
fixedDF.write.mode("overwrite").parquet(partitionName + ".parquet")
I get the following error:
Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a NullType (value: BsonString{value='117679.8'})
at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:214)
at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
A NullType
again!
The same issue occurs when I just count the number of rows: fixedDF.count()
.
Does Spark infer the schema again when writing to Parquet (or counting)? Is it possible to turn such inference off (or overcome this in some other way)?
Upvotes: 3
Views: 4455
Reputation: 11865
The problem is that, even if you supply a DataFrame
with an explicit schema, for some operations (like count()
or for saving to disk) a Mongo-derived DataFrame
will still infer the schema.
To infer the schema, it uses sampling which means that it does not see some data while inferring. If it only sees some field having a null
value, it will infer NullType
for it. And later, when it encounters this field with some string, such a string would not be able to be converted to NullType
.
So the fundamental problem here is sampling. If your schema is stable and 'dense' (every or near every document has all fields filled), sampling will work well. But if some fields are 'sparse' (null with high probability), sampling could fail.
A crude solution is to avoid sampling altogether. That is, infer schema using general population and not a sample. If there is no too much data (or you are able to wait), it could work.
Here is an experimental branch: https://github.com/rpuch/mongo-spark/tree/read-full-collection-instead-of-sampling
The idea is to switch from sampling to using the whole collection if configured so. It is a bit too cumbersome to introduce a new configuration option, so I just disable sampling if 'sampleSize' configuration option is set to 1, like this:
.option("sampleSize", 1) // MAGIC! This effectively turns sampling off, instead the schema is inferred based on general population
In such a case, the sampling is avoided altogether. An obvious solution to sample using N equal to the collection size makes MongoDB sort a lot of data in memory which seems problematic. Hence I disable sampling completely.
Upvotes: 3
Reputation: 5487
Issue is not due to parquet write method. Error is occurring While reading data as dataframe due to some type cast problem. This jira page says we need to add samplePoolSize
option along with other options while reading data from mondoDB.
Upvotes: 1