Reputation: 51
I am trying to set a default value for nested columns with null values in Spark, but it looks like the DataFrameNaFunctions.fill function does not work for nested columns.
import spark.implicits._
case class Demographics(city: String)
case class Detail(age: Int, demographics: Demographics)
case class Person(name: String, details: Details
val data = Seq(Data(Person("James", Details(48, demographics=Demographics("Toronto")))), Data(Person("Mary", Details(41, demographics=Demographics(null)))), Data(null)).toDS
data.na.fill("default").show(false)
+------------------------+
|person |
+------------------------+
|{James, {48, {Toronto}}}|
|{Mary, {41, {NULL}}} |
|NULL |
+------------------------+
What I am expecting:
+------------------------+
|person |
+------------------------+
|{James, {48, {Toronto}}}|
|{Mary, {41, {default}}} |
|NULL |
+------------------------+
Anyone know of a way to do this? By the way the main reason I want to set a value is because I need to reference JVM objects which are Java beans and these fields cannot be null.
val encoder = Encoders.bean(classOf[InputBeanClass])
data.map(row => {
row
})(encoder).count()
If I run the code above I get the following error:
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
Upvotes: 0
Views: 111
Reputation: 51
Using the recommendation in this post Spark: Replace Null value in a Nested column, I was able to get something working. This is my current solution.
def replaceNestedValues(schema: StructType,
path: Option[String] = None): Seq[Column] = {
schema.fields.map(f => {
val p = path.fold(s"`${f.name}`")(c => s"$c.`${f.name}`")
f.dataType match {
case s: StructType =>
val c = col(p)
when(c.isNotNull, struct(replaceNestedValues(s, Some(p)): _*).alias(f.name)).otherwise(c).alias(f.name)
case _: DoubleType =>
val c = col(p)
when(c.isNull, lit(0.0)).otherwise(c).alias(f.name)
case _: IntegerType =>
val c = col(p)
when(c.isNull, lit(0)).otherwise(c).alias(f.name)
case _: LongType =>
val c = col(p)
when(c.isNull, lit(0L)).otherwise(c).alias(f.name)
case _ => col(p)
}
})
}
def setDefaultValues(df: DataFrame): DataFrame = {
val replacedCols = replaceNestedValues(df.schema)
df.select(replacedCols: _*)
}
...
val ds = setDefaultValues(df).as(encoder)
Upvotes: 0
Reputation: 2841
There isn't a straight forward simple way to do that. You have two main options:
Upvotes: 0