Antonio Ye
Antonio Ye

Reputation: 51

Spark Dataframe na.fill for nested columns

I am trying to set a default value for nested columns with null values in Spark, but it looks like the DataFrameNaFunctions.fill function does not work for nested columns.

import spark.implicits._

case class Demographics(city: String)
case class Detail(age: Int, demographics: Demographics)
case class Person(name: String, details: Details

val data = Seq(Data(Person("James", Details(48, demographics=Demographics("Toronto")))), Data(Person("Mary", Details(41, demographics=Demographics(null)))), Data(null)).toDS

data.na.fill("default").show(false)
+------------------------+
|person                  |
+------------------------+
|{James, {48, {Toronto}}}|
|{Mary, {41, {NULL}}}    |
|NULL                    |
+------------------------+

What I am expecting:
+------------------------+
|person                  |
+------------------------+
|{James, {48, {Toronto}}}|
|{Mary, {41, {default}}}    |
|NULL                    |
+------------------------+

Anyone know of a way to do this? By the way the main reason I want to set a value is because I need to reference JVM objects which are Java beans and these fields cannot be null.

val encoder = Encoders.bean(classOf[InputBeanClass])
data.map(row => {
   row
})(encoder).count()

If I run the code above I get the following error:

If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

Upvotes: 0

Views: 111

Answers (2)

Antonio Ye
Antonio Ye

Reputation: 51

Using the recommendation in this post Spark: Replace Null value in a Nested column, I was able to get something working. This is my current solution.

def replaceNestedValues(schema: StructType,
                        path: Option[String] = None): Seq[Column] = {
    schema.fields.map(f => {
      val p = path.fold(s"`${f.name}`")(c => s"$c.`${f.name}`")
      f.dataType match {
        case s: StructType =>
          val c = col(p)
          when(c.isNotNull, struct(replaceNestedValues(s, Some(p)): _*).alias(f.name)).otherwise(c).alias(f.name)
        case _: DoubleType =>
          val c = col(p)
          when(c.isNull, lit(0.0)).otherwise(c).alias(f.name)
        case _: IntegerType =>
          val c = col(p)
          when(c.isNull, lit(0)).otherwise(c).alias(f.name)
        case _: LongType =>
          val c = col(p)
          when(c.isNull, lit(0L)).otherwise(c).alias(f.name)
        case _ => col(p)
      }
    })
  }

def setDefaultValues(df: DataFrame): DataFrame = {
    val replacedCols = replaceNestedValues(df.schema)
    df.select(replacedCols: _*)
}

...

val ds = setDefaultValues(df).as(encoder)

Upvotes: 0

Chris
Chris

Reputation: 2841

There isn't a straight forward simple way to do that. You have two main options:

  1. Use scala types that do not allow null - i.e. your nested field is never null but "default".
    You could write your own option like ADT with Default(X) instead of null, in this case you would need to use something like frameless injections to flatten the nesting structure along with swapping to it's encoding derivation
  2. Use an interim transformation before using the bean encoder via withField (since 3.1) for each nullable field to swap null for it's field appropriate value

Upvotes: 0

Related Questions