Reputation: 67
I use spark(2.4) with scala. I have a dataframe and I am trying to replace null values (of my array columns) by défault values (empty array).
val emptyStringArray = udf(() => Array.empty[String],
DataTypes.createArrayType(DataTypes.StringType, false))
def ensureNonNullCol: DataFrame => DataFrame = inputDf => {
inputDf.select(inputDf.schema.fields.map { f: StructField =>
f.dataType match {
case array: ArrayType => new Column(
AssertNotNull(when(col(f.name).isNull,
array.elementType match {
case DataTypes.StringType => emptyStringArray()
}).otherwise(col(f.name)).expr)
).as(f.name)
}
}: _*)
}
At the end, i get :
|-- StrAarrayColumn: array (nullable = false)
| |-- element: string (containsNull = true)
How can I have :
|-- StrAarrayColumn: array (nullable = false)
| |-- element: string (containsNull = false)
?
Upvotes: 1
Views: 526
Reputation: 3173
The problem is, your first dataframe has a struct, which contains an array of strings which may contain nulls. Now your ensureNonNullCol
function receives an input dataframe with some struct, you just select some values, and don't change the struct of your dataframe, and just return it. Before I get to the solution, there are 3 important points to your code.
Anyway, the solution is to also update the struct type after you select the fields you want in your function:
def ensureNonNullCol: DataFrame => DataFrame = inputDf => {
val newStruct = StructType(inputDf.schema.map { field =>
val newDataType = field.dataType match {
case arr: ArrayType if arr.elementType == StringType => arr.copy(containsNull = false)
case other => other
}
field.copy(dataType = newDataType)
})
val newDF = inputDf.select(inputDf.schema.fields.map { f: StructField =>
f.dataType match {
case array: ArrayType => new Column(
AssertNotNull(when(col(f.name).isNull,
array.elementType match {
case DataTypes.StringType => Array.empty[String]
case _ => col(f.name)
}).otherwise(col(f.name)).expr)
).as(f.name)
case _ => col(f.name)
}
}: _*)
spark.createDataFrame(newDF.rdd, newStruct)
}
Upvotes: 1