Reputation: 13686
Assume you have written a collection of some case class to parquet, and then would like to read it in another spark job, back to the same case class (that is, you've written some List[MyCaseClass]
and would like to read it back).
For generality, assume MyCaseClass
has nested case classes in it.
Currently I can get this working only using this code blueprint:
/** applies the secret sauce for coercing to a case class that is implemented by spark's flatMap */
private def toCaseClass(spark : SparkSession, inputDF : DataFrame) : Dataset[MyCaseClass] = {
import spark.implicits._
inputDF.as[MyCaseClass].flatMap(record => {
Iterator[MyCaseClass](record)
})
}
It seems that in Spark 2.x, flatMap
will lead into experimental spark code that does the conversion/coercion (that code is annotated as experimental in the spark code base when using a debugger to view it). Obviously serialization is typically a thorny issue in Java/Scala. Are there additional, safe, ways?
Outside of spark, I've found stand-alone code solutions suggested elsewhere on stackoverflow shaky and poorly supported.
I'm looking for clean, declarative, ways that do not require hand-coding how to convert each and every field, which rely on well-supported solid libraries, which do not rely on mega-slow reflection in a way defeating the elegance. Probably an impossible mix of desiderata, but that would be just expected of a language proud of its case classes and having Spark as one of its major accomplishments.
Comments conversely about why not to use case classes are also welcome!
Upvotes: 4
Views: 2287
Reputation: 6897
As Luis Miguel has commented, most of the Dataset API is marked experimental but has been stable and used in production for several years now.
The issue with Dataset.as[U]
You're quite correct that simply using .as[MyCaseClass]
has several subtle differences with explicitly instantiating the case class: the most important one is that Dataset.as[U]
does not guarantee that your dataset only contains columns defined by type U, it may keep additional data that can break computations later.
Here's an example:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class MyData(value: Int)
val df: DataFrame = spark.createDataset(Seq(1,1,2,3)).withColumn("hidden",rand)
val ds: Dataset[MyData] = df.as[MyData]
ds.distinct.count
res3: Long = 4
The Dataset ds
retains the hidden
column value even though it's not defined in MyData type and that may generate unexpected results: someone looking at the dataset ds
as a collection of MyData
above would certainly expect the distinct count to be 3 and not 4.
How to safely convert to a Dataset[MyData] ?
If you explicitly want to only keep your case class columns in your Dataset, there's a very simple solution (but with sub-optimal performance): extract it as RDD and reconvert it to Dataset[U].
val ds = df.as[MyData].rdd.toDS()
ds.distinct.count
res5: Long = 3
It basically does exactly what your flatMap
is doing with the same cost: Spark needs to unserialize the data from its internal row format to create a case class instance and reserialize it to internal row.
It generates unnecessary garbage, increase memory pressure and may break WholeStage codegen optimizations.
The better way, in my opinion, is simply to select the necessary columns from the source DataFrame as you cast your Dataset to the specified case class. This will prevent most of the unwanted side-effects of as[U]
but without the deserialization/serialization cost.
An elegant way to do it is to leverage Scala ability to extend behavior of existing classes and instances with implicit classes:
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql._
object SparkExtensions {
implicit class ExtendedDataFrame(df: DataFrame) {
def to[T <: Product: TypeTag]: Dataset[T] = {
import df.sparkSession.implicits._
import org.apache.spark.sql.functions.col
df.select(Encoders.product[T].schema.map(f => col(f.name)): _*).as[T]
}
}
}
With the above object, I can now modify my initial code:
import SparkExtensions._
val ds: Dataset[MyData] = df.to[MyData]
ds.distinct.count
res11: Long = 3
ds.printSchema
root
|-- value: integer (nullable = false)
Upvotes: 5
Reputation: 2495
I've done some pretty complex and nested case class
types, and have never had to do the identity .flatMap()
that you have.
Generally, I just make sure I have an implicit Encoder
in scope, and spark seems happy enough just using .as[MyCaseClass]
to convert a DataFrame
to a Dataset
.
I have a pretty common pattern of:
implicit def enc: Encoder[MyCaseClass] = Encoders.product[MyCaseClass]
Naturally, you'd have to have a separate encoder for each of your nested types as well. As long as they all extend Product
(as case class
does), then the Encoders.product[T]
works.
Upvotes: 0