D-Dᴙum
D-Dᴙum

Reputation: 7890

Convert Dataset<Row> to a Typed Dataset having optional Parameters

I have a Dataset that I wish to convert to a type-dataset where the type is a case class having Option for several parameters. For example using spark shell I create a case class, a encoder and (raw) Dataset:

case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)
val df = Seq((1, 34.0), (2,3.4)).toDF("id", "t1")
implicit val analogueChannelEncoder: Encoder[Analogue] = Encoders.product[Analogue]

I want to create a Dataset<Analogue> from df so I try:

df.as(analogueChannelEncoder)

But this results in the error:

org.apache.spark.sql.AnalysisException: cannot resolve '`t2`' given input columns: [id, t1];

Looking at the schemas of df and analogueChannelEncoder the difference is apparent:

scala> df.schema
res3: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,false), StructField(t1,DoubleType,false))

scala> analogueChannelEncoder.schema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true))

I have seen this answer but this will not work for me as my Dataset is assembled and is not a straight-forward load from a data source

How can I cast my untyped Dataset<Row> to Dataset<Analogue>?

Upvotes: 0

Views: 801

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29165

Your case class

  case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None)

your conversion code ...

 val encoderSchema = Encoders.product[Analogue].schema
  val df1: Dataset[Row] = spark.createDataset(Seq((1, 34.0), (2, 3.4))).map(x => Analogue(x._1, Option(x._2), None))
    .toDF("id", "t1", "t2")
  df1.show

  df1.printSchema()
  encoderSchema.printTreeString()

Result :

+---+----+----+
| id|  t1|  t2|
+---+----+----+
|  1|34.0|null|
|  2| 3.4|null|
+---+----+----+

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)

Update (increased more columns in the case class which are options) :

I am assuming that your case class has many fields (for example 5 fields ) if the option values are none then ... it works like below...

Below is the example


  case class Analogue(id: Long, t1: Option[Double] = None, t2: Option[Double] = None, t3: Option[Double] = None, t4: Option[Double] = None, t5: Option[Double] = None)



  val encoderSchema = Encoders.product[Analogue].schema
  println(encoderSchema.toSeq)
  val df1 = spark.createDataset(Seq((1, 34.0), (2, 3.4)))
    .map(x => Analogue(x._1, Option(x._2)))
     .as[Analogue].toDF()
  df1.show
  df1.printSchema()
  encoderSchema.printTreeString()

if you set the fields that are present, remaining fields it will take it as None.

StructType(StructField(id,LongType,false), StructField(t1,DoubleType,true), StructField(t2,DoubleType,true), StructField(t3,DoubleType,true), StructField(t4,DoubleType,true), StructField(t5,DoubleType,true))
+---+----+----+----+----+----+
| id|  t1|  t2|  t3|  t4|  t5|
+---+----+----+----+----+----+
|  1|34.0|null|null|null|null|
|  2| 3.4|null|null|null|null|
+---+----+----+----+----+----+

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)
 |-- t3: double (nullable = true)
 |-- t4: double (nullable = true)
 |-- t5: double (nullable = true)

root
 |-- id: long (nullable = false)
 |-- t1: double (nullable = true)
 |-- t2: double (nullable = true)
 |-- t3: double (nullable = true)
 |-- t4: double (nullable = true)
 |-- t5: double (nullable = true)

If its not working this way pls consider my comment broadcast idea and work further on it.

Upvotes: 1

D-Dᴙum
D-Dᴙum

Reputation: 7890

I have resolved the issue by inspecting the 'incoming' Dataset<Row> for its columns and comparing them to the columns in a Dataset<Analogue>. The difference that results I use to append new columns to my Dataset<Row> before then casting it as Dataset<Analogue>.

Upvotes: 0

Related Questions