Tom Lous
Tom Lous

Reputation: 2899

Spark default null columns DataSet

I cannot make Spark read a json (or csv for that matter) as Dataset of a case class with Option[_] fields where not all fields are defined in source.

It's a bit cryptic, but let's say I have a case class called CustomData

Given the following json file (customA.json):

{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789,              "colB": "a"}

And the following code:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("test")
  .getOrCreate()

import spark.implicits._

case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

val ds = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customA.json")
  .as[CustomData]
  .show()

The output is - as expected - :

+----+----+---+
|colA|colB| id|
+----+----+---+
|   x|   z|123|
|   y|null|456|
|null|   a|789|
+----+----+---+

Even though not all columns are defined always. But if I want to use the same code for reading a file where one of the columns appears nowhere I cannot make it happen:

For the other json file (customB.json):

{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}

And the additional code:

  val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .as[CustomData]
  .show()

The output is an error:

org.apache.spark.sql.AnalysisException: cannot resolve 'colB' given input columns: [colA, id];

Which makes sense, but I'd love to reuse the same case class for both files. Especially if I don't know wether colB even appears in the json file before ingesting it.

Of course I can make checks, but is there a way to convert non-existing columns to null (as with customA.json). Setting readmode to Permissive doesn't seem to change anything.

Am I missing something?

Upvotes: 4

Views: 1880

Answers (2)

grepIt
grepIt

Reputation: 116

Here is an even simpler solution:

    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.catalyst.ScalaReflection
    import scala.reflect.runtime.universe._

val structSchema = ScalaReflection.schemaFor[CustomData].dataType.asInstanceOf[StructType]
val df = spark.read.schema(structSchema).json(jsonRDD)

Upvotes: 1

Tom Lous
Tom Lous

Reputation: 2899

I'll put an answer down here. To show you what (sort of) works, but looks very hacky IMHO.

By extending the DataFrame with a method to force the StructType of a case class on top of the already existing StructType it actually works, but maybe (I really hope) there are better / cleaner solutions.

Here goes:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._

case class DataFrameExtended(dataFrame: DataFrame) {

  def forceMergeSchema[T: TypeTag]: DataFrame = {
    ScalaReflection
      .schemaFor[T]
      .dataType
      .asInstanceOf[StructType]
      .filterNot(
        field => dataFrame.columns.contains(field.name)
      )
      .foldLeft(dataFrame){
        case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
      }
  }
}

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
  DataFrameExtended(df)
}

val ds2 = spark
  .read
  .option("mode", "PERMISSIVE")
  .json("src/main/resources/customB.json")
  .forceMergeSchema[CustomData]
  .as[CustomData]
  .show()

Now show a result I was hoping for:

+----+---+----+
|colA| id|colB|
+----+---+----+
|   x|321|null|
|   y|654|null|
|null|987|null|
+----+---+----+

I've tried this only with scalar types like (Int, String, etc) I think more complex structures will fail horribly. So I'm still looking for the better answer.

Upvotes: 4

Related Questions