Reputation: 7890
I have a Dataset that I wish to convert to a type-dataset where the type is a case class having Option
for several parameters. For example using spark shell I create a case class, a encoder and (raw) Dataset:
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)
val df = Seq((1, 34.0), (2,3.4)).toDF("id", "t1")
implicit val analogueChannelEncoder: Encoder[Analogue] = Encoders.product[Analogue]
I want to create a Dataset<Analogue>
from df
so I try:
df.as(analogueChannelEncoder)
But this results in the error:
org.apache.spark.sql.AnalysisException: cannot resolve '`t2`' given input columns: [id, t1];
Looking at the schemas of df
and analogueChannelEncoder
the difference is apparent:
scala> df.schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(t1,DoubleType,false))
scala> analogueChannelEncoder.schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true))
I have seen this answer but this will not work for me as my Dataset
is assembled and is not a straight-forward load from a data source
How can I cast my untyped Dataset<Row>
to Dataset<Analogue>
?
Upvotes: 0
Views: 801
Reputation: 29165
Your case class
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)
your conversion code ...
val encoderSchema = Encoders.product[Analogue].schema
val df1: Dataset[Row] = spark.createDataset(Seq((1, 34.0), (2, 3.4))).map(x => Analogue(x._1, Option(x._2), None))
.toDF("id", "t1", "t2")
df1.show
df1.printSchema()
encoderSchema.printTreeString()
Result :
+---+----+----+
| id| t1| t2|
+---+----+----+
| 1|34.0|null|
| 2| 3.4|null|
+---+----+----+
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
I am assuming that your case class has many fields (for example 5 fields ) if the option values are none then ... it works like below...
Below is the example
case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None, t3: Option[Double] = None, t4: Option[Double] = None, t5: Option[Double] = None)
val encoderSchema = Encoders.product[Analogue].schema
println(encoderSchema.toSeq)
val df1 = spark.createDataset(Seq((1, 34.0), (2, 3.4)))
.map(x => Analogue(x._1, Option(x._2)))
.as[Analogue].toDF()
df1.show
df1.printSchema()
encoderSchema.printTreeString()
if you set the fields that are present, remaining fields it will take it as None.
StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true), StructField(t3,DoubleType,true), StructField(t4,DoubleType,true), StructField(t5,DoubleType,true))
+---+----+----+----+----+----+
| id| t1| t2| t3| t4| t5|
+---+----+----+----+----+----+
| 1|34.0|null|null|null|null|
| 2| 3.4|null|null|null|null|
+---+----+----+----+----+----+
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
|-- t3: double (nullable = true)
|-- t4: double (nullable = true)
|-- t5: double (nullable = true)
root
|-- id: long (nullable = false)
|-- t1: double (nullable = true)
|-- t2: double (nullable = true)
|-- t3: double (nullable = true)
|-- t4: double (nullable = true)
|-- t5: double (nullable = true)
If its not working this way pls consider my comment broadcast idea and work further on it.
Upvotes: 1
Reputation: 7890
I have resolved the issue by inspecting the 'incoming' Dataset<Row>
for its columns and comparing them to the columns in a Dataset<Analogue>
. The difference that results I use to append new columns to my Dataset<Row>
before then casting it as Dataset<Analogue>
.
Upvotes: 0