Reputation: 31
I'm trying to update my app from Spark 1.6.2 to 2.0.0, my problem is to create a Dataset from Dataframe (parquet that i read).
I know that i can use case class or tuple to type the Dataframe then have a Dataset but before the runtime i don't know which data will load the user, so the type of column and number of them.
To load data i read data from parquet with a SparkSession, simple like :
spark.read.schema(schema).parquet(dataPath)
schemaOfData is a StructType instantiate by an List[Map[String, String]] that contains the name of the column and his type (which is else String else Double).
I found this on StackOverflow but i struggle to understand it and guest if there isn't an easier way to solve my problem: Dynamically compiling scala class files at runtime in Scala 2.11
Thanks
Upvotes: 3
Views: 2295
Reputation:
Create implicit conversion from spark datatypes to Scala native data types.
Then map that type to schema with StructFields of Spark DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
val spark = SparkSession
.builder
.appName("Movies Reviews")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val someDF = Seq(
(8, "bat"),
(64, "mouse"),
(-27, "horse")
).toDF("number", "word")
someDF.printSchema()
def schemaCaseClass(schema:StructType, className:String)
(implicit sparkTypeScala:DataType => String):String = {
def structField(col:StructField):String = {
val sparkTypes = sparkTypeScala(col.dataType)
col match {
case x if x.nullable => s" ${col.name}:Option[$sparkTypes]"
case _ => s" ${col.name}:$sparkTypes"
}
}
val fieldsName = schema.map(structField).mkString(",\n ")
s"""
|case class $className (
| $fieldsName
|)
""".stripMargin
}
implicit val scalaTypes:DataType => String = {
case _: ByteType => "Byte"
case _: ShortType => "Short"
case _: IntegerType => "Int"
case _: LongType => "Long"
case _: FloatType => "Float"
case _: DoubleType => "Double"
case _: DecimalType => "java.math.BigDecimal"
case _: StringType => "String"
case _: BinaryType => "Array[Byte]"
case _: BooleanType => "Boolean"
case _: TimestampType => "java.sql.Timestamp"
case _: DateType => "java.sql.Date"
case _: ArrayType => "scala.collection.Seq"
case _: MapType => "scala.collection.Map"
case _: StructType => "org.apache.spark.sql.Row"
case _ => "String"
}
println(schemaCaseClass(someDF.schema, "someDF"))
Upvotes: 0