Pinky
Pinky

Reputation: 231

Spark scala creating dataFrame from rdd using Row and Schema

I am trying to create a dataframe from RDD in order to be able to write to a json with following format A sample json is as shown below(expected output)

"1234":[ { loc:'abc', cost1:1.234, cost2:2.3445 }, { loc:'www', cost1:1.534, cost2:6.3445 } ]

I am able to generate the json with cost1 and cost2 in String format. But I want cost1 and cost2 to be double. I am getting error while creating data frame from rdd using schema defined. Somehow the data is being considered as String instead of double. Can someone help me to get this right? Below is my scala code of my sample implementation

object csv2json {
  def f[T](v: T) = v match {
  case _: Int    => "Int"
  case _: String => "String"
  case _: Float => "Float"
  case _: Double => "Double"
  case _:BigDecimal => "BigDecimal"
  case _         => "Unknown"
  }

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._

    val input_df = Seq(("12345", "111","1.34","2.34"),("123456", "112","1.343","2.344"),("1234", "113","1.353","2.354"),("1231", "114","5.343","6.344")).toDF("item_id","loc","cost1","cost2")
    input_df.show()  
    val inputRDD =  input_df.rdd.map(data => {

        val  nodeObj  = scala.collection.immutable.Map("nodeId" -> data(1).toString()
        ,"soc" -> data(2).toString().toDouble
        ,"mdc" -> data(3).toString().toDouble)
        (data(0).toString(),nodeObj)
      })

      val inputRDDAgg = inputRDD.aggregateByKey(scala.collection.mutable.ListBuffer.empty[Any])((nodeAAggreg,costValue) => nodeAAggreg += costValue , (nodeAAggreg,costValue) => nodeAAggreg ++ costValue)

      val inputRDDAggRow = inputRDDAgg.map(data => {
        println(data._1 + "and------ " + f(data._1))
        println(data._2 + "and------ " + f(data._2))

        val  skuObj  = Row(
         data._1,
         data._2)
        skuObj
      }
      )

      val innerSchema =  ArrayType(MapType(StringType, DoubleType, true))
      val schema:StructType = StructType(Seq(StructField(name="skuId", dataType=StringType),StructField(name="nodes", innerSchema)))
      val finalJsonDF = spark.createDataFrame(inputRDDAggRow, schema)
      finalJsonDF.show()
  }
}

Below is the exception stacktrace:

java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, skuId), StringType), true, false) AS skuId#32
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), if (isnull(validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), MapType(StringType,DoubleType,true)))) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, nodes), ArrayType(MapType(StringType,DoubleType,true),true)), None) AS nodes#33
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)

Upvotes: 0

Views: 1110

Answers (2)

Balaji Reddy
Balaji Reddy

Reputation: 5700

I see schema mismatch in your code. I made simple fix as an workaround

I converted data(1).toString to data(1).toString.toDouble. In your ArrayType(MapType(StringType, DoubleType, true)), you have mentioned all the values are Double where as one of your value is String. I believe that is the issue.

val inputRDD =  input_df.rdd.map(data => {

      val  nodeObj  = scala.collection.immutable.Map("nodeId" -> data(1).toString.toDouble
        ,"soc" -> data(2).toString().toDouble
        ,"mdc" -> data(3).toString().toDouble)
      (data(0).toString(),nodeObj)
    })

Output

+------+--------------------------------------------------+
|skuId |nodes                                             |
+------+--------------------------------------------------+
|1231  |[Map(nodeId -> 114.0, soc -> 5.343, mdc -> 6.344)]|
|12345 |[Map(nodeId -> 111.0, soc -> 1.34, mdc -> 2.34)]  |
|123456|[Map(nodeId -> 112.0, soc -> 1.343, mdc -> 2.344)]|
|1234  |[Map(nodeId -> 113.0, soc -> 1.353, mdc -> 2.354)]|
+------+--------------------------------------------------+

Hope this helps!

Upvotes: 0

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

I would suggest you to stay with dataset or dataframe by using inbult functions as they are optimized version of rdds.

So you can do the following to achieve your requirement

import org.apache.spark.sql.functions._
val finalJsonDF = input_df
  .groupBy("item_id")
  .agg(
    collect_list(
      struct(col("loc"), col("cost1").cast("double"), col("cost2").cast("double")))
      .as("jsonData"))

where collect_list and struct are inbuilt functions

which should give you

+-------+-------------------+
|item_id|jsonData           |
+-------+-------------------+
|123456 |[[112,1.343,2.344]]|
|1234   |[[113,1.353,2.354]]|
|1231   |[[114,5.343,6.344]]|
|12345  |[[111,1.34,2.34]]  |
+-------+-------------------+

and saving the jsonData to json file as your requirement

finalJsonDF.coalesce(1).write.json("path to output file")

should give you

{"item_id":"123456","jsonData":[{"loc":"112","col2":1.343,"col3":2.344}]}
{"item_id":"1234","jsonData":[{"loc":"113","col2":1.353,"col3":2.354}]}
{"item_id":"1231","jsonData":[{"loc":"114","col2":5.343,"col3":6.344}]}
{"item_id":"12345","jsonData":[{"loc":"111","col2":1.34,"col3":2.34}]}

Upvotes: 0

Related Questions