Yanmanlik
Yanmanlik

Reputation: 31

Scala 2.11 & Spark 2.0.0 Create dynamically case class to encode Dataset

I'm trying to update my app from Spark 1.6.2 to 2.0.0, my problem is to create a Dataset from Dataframe (parquet that i read).

I know that i can use case class or tuple to type the Dataframe then have a Dataset but before the runtime i don't know which data will load the user, so the type of column and number of them.

To load data i read data from parquet with a SparkSession, simple like :

spark.read.schema(schema).parquet(dataPath)

schemaOfData is a StructType instantiate by an List[Map[String, String]] that contains the name of the column and his type (which is else String else Double).

I found this on StackOverflow but i struggle to understand it and guest if there isn't an easier way to solve my problem: Dynamically compiling scala class files at runtime in Scala 2.11

Thanks

Upvotes: 3

Views: 2295

Answers (1)

user9946307
user9946307

Reputation:

Create implicit conversion from spark datatypes to Scala native data types.

Then map that type to schema with StructFields of Spark DataFrame

  import org.apache.spark.sql.SparkSession
  import org.apache.spark.sql.types._


    val spark = SparkSession
      .builder
      .appName("Movies Reviews")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._
    val someDF = Seq(
      (8, "bat"),
      (64, "mouse"),
      (-27, "horse")
    ).toDF("number", "word")

    someDF.printSchema()

    def schemaCaseClass(schema:StructType, className:String)
                       (implicit sparkTypeScala:DataType => String):String = {
      def structField(col:StructField):String = {
        val sparkTypes = sparkTypeScala(col.dataType)
        col match {
          case x if x.nullable => s"  ${col.name}:Option[$sparkTypes]"
          case _ => s"  ${col.name}:$sparkTypes"
        }
      }

    val fieldsName = schema.map(structField).mkString(",\n  ")
    s"""
       |case class $className (
       |  $fieldsName
       |)
    """.stripMargin
    }

    implicit val scalaTypes:DataType => String = {
        case _: ByteType => "Byte"
        case _: ShortType => "Short"
        case _: IntegerType => "Int"
        case _: LongType => "Long"
        case _: FloatType => "Float"
        case _: DoubleType => "Double"
        case _: DecimalType => "java.math.BigDecimal"
        case _: StringType => "String"
        case _: BinaryType => "Array[Byte]"
        case _: BooleanType => "Boolean"
        case _: TimestampType => "java.sql.Timestamp"
        case _: DateType => "java.sql.Date"
        case _: ArrayType => "scala.collection.Seq"
        case _: MapType => "scala.collection.Map"
        case _: StructType => "org.apache.spark.sql.Row"
        case _ => "String"
    }


    println(schemaCaseClass(someDF.schema, "someDF"))

Upvotes: 0

Related Questions