JimyRyan
JimyRyan

Reputation: 359

Spark 2.1: Convert RDD to Dataset with custom columns using toDS() function

I want to transform an RDD into a Dataset with custom columns using the Spark SQL native function toDS().

I don't have any errors at compilation time, but at runtime, I got the error No Encoder found for java.time.LocalDate.
Bellow, the full stack trace log:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_1")
- root class: "scala.Tuple3"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:49)
    at observatory.Extraction$.locationYearlyAverageRecords(Extraction.scala:114)
    at observatory.Extraction$.processExtraction(Extraction.scala:28)
    at observatory.Main$.delayedEndpoint$observatory$Main$1(Main.scala:18)
    at observatory.Main$delayedInit$body.apply(Main.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at observatory.Main$.main(Main.scala:7)
    at observatory.Main.main(Main.scala)

The structure of my RDD is composed of three columns, based on Tuple3 where the signature is:

type TemperatureRecord = (LocalDate, Location, Double)

Field LocalDate is the Java Object coming from package java.time.LocalDate.
Field Location is a custom type made with two Double (GPS coordinates) having this signature:

case class Location(lat: Double, lon: Double)

Below, one sample row:

(1975-01-01, Location(70.933,-8.667), -4.888888888888889)


Some details about my application / environment:


I have read from this article How to store custom objects in Dataset? that I need to define custom Encoder, but I don't have any idea :(.

Upvotes: 2

Views: 1318

Answers (1)

Luis
Luis

Reputation: 551

The problem is that Spark does not find an encoder for regular classes. As of today Spark only allows to use primitive types for encoders and there is no good support for custom classes.

As for your case, given your "custom" class represents a date you can use java.sql.date instead java.time.LocalDate. The benefit is that you can take advantage of encoders already provided by Spark.

import java.sql.Date
case class TempRow(date: Date, loc: Location, temp: Double)

val ds = Seq(TempRow(java.sql.Date.valueOf("2017-06-01"), 
 Location(1.4,5.1), 4.9), TempRow(java.sql.Date.valueOf("2014-04-05"),
 Location(1.5,2.5), 5.5))
 .toDS

ds.show()

+----------+---------+----+
|      date|      loc|temp|
+----------+---------+----+
|2017-06-01|[1.4,5.1]| 4.9|
|2014-04-05|[1.5,2.5]| 5.5|
+----------+---------+----+

Check the schema:

ds.printSchema()

root
 |-- date: date (nullable = true)
 |-- loc: struct (nullable = true)
 |    |-- i: double (nullable = false)
 |    |-- j: double (nullable = false)
 |-- temp: double (nullable = false)

For more general cases, there is one trick that you can perform to store the majority of custom classes in a Spark dataset. Bear in mind that it does not work for all cases because you need to use a string as an intermediate representation of your custom object. I hope this issue will be solved in the future because it is really a pain.

Find below one solution for your case:

case class Location(val i: Double, val j: Double)
class TempRecord(val date: java.time.LocalDate, val loc: Location, val temp: Double)
type TempSerialized = (String, Location, Double)

implicit def fromSerialized(t: TempSerialized): TempRecord = new TempRecord(java.time.LocalDate.parse(t._1), t._2, t._3)
implicit def toSerialized(t: TempRecord): TempSerialized = (t.date.toString, t.loc, t.temp)

// Finally we can create datasets
val d = spark.createDataset(Seq[TempSerialized](
  new TempRecord(java.time.LocalDate.now, Location(1.0,2.0), 3.0), 
  new TempRecord(java.time.LocalDate.now, Location(5.0,4.0), 4.0) )
).toDF("date", "location", "temperature").as[TempSerialized]

d.show()

+----------+---------+-----------+
|      date| location|temperature|
+----------+---------+-----------+
|2017-07-11|[1.0,2.0]|        3.0|
|2017-07-11|[5.0,4.0]|        4.0|
+----------+---------+-----------+

d.printSchema()

root
 |-- date: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- i: double (nullable = false)
 |    |-- j: double (nullable = false)
 |-- temperature: double (nullable = false)

Upvotes: 1

Related Questions