Marwan02
Marwan02

Reputation: 67

Create a non nullable empty array column

I use spark(2.4) with scala. I have a dataframe and I am trying to replace null values (of my array columns) by défault values (empty array).

val emptyStringArray = udf(() => Array.empty[String],
DataTypes.createArrayType(DataTypes.StringType, false))

def ensureNonNullCol: DataFrame => DataFrame = inputDf => {

    inputDf.select(inputDf.schema.fields.map { f: StructField =>
      f.dataType match {
        case array: ArrayType => new Column(
          
          AssertNotNull(when(col(f.name).isNull,
            array.elementType match {
              case DataTypes.StringType => emptyStringArray()
            }).otherwise(col(f.name)).expr)

        ).as(f.name)
      }
    }: _*)

}

At the end, i get :

 |-- StrAarrayColumn: array (nullable = false)
 |    |-- element: string (containsNull = true)

How can I have :

 |-- StrAarrayColumn: array (nullable = false)
 |    |-- element: string (containsNull = false)

?

Upvotes: 1

Views: 526

Answers (1)

AminMal
AminMal

Reputation: 3173

The problem is, your first dataframe has a struct, which contains an array of strings which may contain nulls. Now your ensureNonNullCol function receives an input dataframe with some struct, you just select some values, and don't change the struct of your dataframe, and just return it. Before I get to the solution, there are 3 important points to your code.

    1. matching only one possible case of many cases is very dangerous, discouraged and results in match error (notice in your code when you match ArrayType and StringType)
    1. udf with empty input arguments is discouraged and results in warnings, in my spark version it result in runtime exception.
    1. You can return an empty array of string in place, instead of calling that udf.

Anyway, the solution is to also update the struct type after you select the fields you want in your function:

def ensureNonNullCol: DataFrame => DataFrame = inputDf => {
    val newStruct = StructType(inputDf.schema.map { field =>
      val newDataType = field.dataType match {
        case arr: ArrayType if arr.elementType == StringType => arr.copy(containsNull = false)
        case other => other
      }
      field.copy(dataType = newDataType)
    })
    val newDF = inputDf.select(inputDf.schema.fields.map { f: StructField =>
      f.dataType match {
        case array: ArrayType => new Column(

          AssertNotNull(when(col(f.name).isNull,
            array.elementType match {
              case DataTypes.StringType => Array.empty[String]
              case _ => col(f.name)
            }).otherwise(col(f.name)).expr)
        ).as(f.name)

        case _ => col(f.name)
      }
    }: _*)

    spark.createDataFrame(newDF.rdd, newStruct)

  }

Upvotes: 1

Related Questions