Reputation: 2899
I cannot make Spark read a json
(or csv for that matter) as Dataset
of a case class with Option[_]
fields where not all fields are defined in source.
It's a bit cryptic, but let's say I have a case class called CustomData
Given the following json file (customA.json
):
{"id":123, "colA": "x", "colB": "z"}
{"id":456, "colA": "y"}
{"id":789, "colB": "a"}
And the following code:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
import spark.implicits._
case class CustomData(id: BigInt, colA: Option[String], colB: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val ds = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customA.json")
.as[CustomData]
.show()
The output is - as expected - :
+----+----+---+
|colA|colB| id|
+----+----+---+
| x| z|123|
| y|null|456|
|null| a|789|
+----+----+---+
Even though not all columns are defined always. But if I want to use the same code for reading a file where one of the columns appears nowhere I cannot make it happen:
For the other json file (customB.json
):
{"id":321, "colA": "x"}
{"id":654, "colA": "y"}
{"id":987}
And the additional code:
val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.as[CustomData]
.show()
The output is an error:
org.apache.spark.sql.AnalysisException: cannot resolve 'colB
' given input columns: [colA, id];
Which makes sense, but I'd love to reuse the same case class for both files. Especially if I don't know wether colB
even appears in the json file before ingesting it.
Of course I can make checks, but is there a way to convert non-existing columns to null
(as with customA.json
). Setting readmode to Permissive
doesn't seem to change anything.
Am I missing something?
Upvotes: 4
Views: 1880
Reputation: 116
Here is an even simpler solution:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._
val structSchema = ScalaReflection.schemaFor[CustomData].dataType.asInstanceOf[StructType]
val df = spark.read.schema(structSchema).json(jsonRDD)
Upvotes: 1
Reputation: 2899
I'll put an answer down here. To show you what (sort of) works, but looks very hacky IMHO.
By extending the DataFrame with a method to force the StructType
of a case class on top of the already existing StructType
it actually works, but maybe (I really hope) there are better / cleaner solutions.
Here goes:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection
import scala.reflect.runtime.universe._
case class DataFrameExtended(dataFrame: DataFrame) {
def forceMergeSchema[T: TypeTag]: DataFrame = {
ScalaReflection
.schemaFor[T]
.dataType
.asInstanceOf[StructType]
.filterNot(
field => dataFrame.columns.contains(field.name)
)
.foldLeft(dataFrame){
case (newDf, field) => newDf.withColumn(field.name, lit(null).cast(field.dataType))
}
}
}
implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
DataFrameExtended(df)
}
val ds2 = spark
.read
.option("mode", "PERMISSIVE")
.json("src/main/resources/customB.json")
.forceMergeSchema[CustomData]
.as[CustomData]
.show()
Now show a result I was hoping for:
+----+---+----+
|colA| id|colB|
+----+---+----+
| x|321|null|
| y|654|null|
|null|987|null|
+----+---+----+
I've tried this only with scalar types like (Int, String, etc) I think more complex structures will fail horribly. So I'm still looking for the better answer.
Upvotes: 4