Reputation: 231
I am trying to create a dataframe from RDD in order to be able to write to a json with following format A sample json is as shown below(expected output)
"1234":[ { loc:'abc', cost1:1.234, cost2:2.3445 }, { loc:'www', cost1:1.534, cost2:6.3445 } ]
I am able to generate the json with cost1 and cost2 in String format. But I want cost1 and cost2 to be double. I am getting error while creating data frame from rdd using schema defined. Somehow the data is being considered as String instead of double. Can someone help me to get this right? Below is my scala code of my sample implementation
object csv2json {
def f[T](v: T) = v match {
case _: Int => "Int"
case _: String => "String"
case _: Float => "Float"
case _: Double => "Double"
case _:BigDecimal => "BigDecimal"
case _ => "Unknown"
}
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val input_df = Seq(("12345", "111","1.34","2.34"),("123456", "112","1.343","2.344"),("1234", "113","1.353","2.354"),("1231", "114","5.343","6.344")).toDF("item_id","loc","cost1","cost2")
input_df.show()
val inputRDD = input_df.rdd.map(data => {
val nodeObj = scala.collection.immutable.Map("nodeId" -> data(1).toString()
,"soc" -> data(2).toString().toDouble
,"mdc" -> data(3).toString().toDouble)
(data(0).toString(),nodeObj)
})
val inputRDDAgg = inputRDD.aggregateByKey(scala.collection.mutable.ListBuffer.empty[Any])((nodeAAggreg,costValue) => nodeAAggreg += costValue , (nodeAAggreg,costValue) => nodeAAggreg ++ costValue)
val inputRDDAggRow = inputRDDAgg.map(data => {
println(data._1 + "and------ " + f(data._1))
println(data._2 + "and------ " + f(data._2))
val skuObj = Row(
data._1,
data._2)
skuObj
}
)
val innerSchema = ArrayType(MapType(StringType, DoubleType, true))
val schema:StructType = StructType(Seq(StructField(name="skuId", dataType=StringType),StructField(name="nodes", innerSchema)))
val finalJsonDF = spark.createDataFrame(inputRDDAggRow, schema)
finalJsonDF.show()
}
}
Below is the exception stacktrace:
java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, skuId), StringType), true, false) AS skuId#32
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), if (isnull(validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), MapType(StringType,DoubleType,true)))) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, nodes), ArrayType(MapType(StringType,DoubleType,true),true)), None) AS nodes#33
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
Upvotes: 0
Views: 1110
Reputation: 5700
I see schema mismatch in your code. I made simple fix as an workaround
I converted data(1).toString
to data(1).toString.toDouble
. In your ArrayType(MapType(StringType, DoubleType, true))
, you have mentioned all the values are Double
where as one of your value is String
. I believe that is the issue.
val inputRDD = input_df.rdd.map(data => {
val nodeObj = scala.collection.immutable.Map("nodeId" -> data(1).toString.toDouble
,"soc" -> data(2).toString().toDouble
,"mdc" -> data(3).toString().toDouble)
(data(0).toString(),nodeObj)
})
Output
+------+--------------------------------------------------+
|skuId |nodes |
+------+--------------------------------------------------+
|1231 |[Map(nodeId -> 114.0, soc -> 5.343, mdc -> 6.344)]|
|12345 |[Map(nodeId -> 111.0, soc -> 1.34, mdc -> 2.34)] |
|123456|[Map(nodeId -> 112.0, soc -> 1.343, mdc -> 2.344)]|
|1234 |[Map(nodeId -> 113.0, soc -> 1.353, mdc -> 2.354)]|
+------+--------------------------------------------------+
Hope this helps!
Upvotes: 0
Reputation: 41957
I would suggest you to stay with dataset or dataframe by using inbult functions as they are optimized version of rdds.
So you can do the following to achieve your requirement
import org.apache.spark.sql.functions._
val finalJsonDF = input_df
.groupBy("item_id")
.agg(
collect_list(
struct(col("loc"), col("cost1").cast("double"), col("cost2").cast("double")))
.as("jsonData"))
where collect_list
and struct
are inbuilt functions
which should give you
+-------+-------------------+
|item_id|jsonData |
+-------+-------------------+
|123456 |[[112,1.343,2.344]]|
|1234 |[[113,1.353,2.354]]|
|1231 |[[114,5.343,6.344]]|
|12345 |[[111,1.34,2.34]] |
+-------+-------------------+
and saving the jsonData to json file as your requirement
finalJsonDF.coalesce(1).write.json("path to output file")
should give you
{"item_id":"123456","jsonData":[{"loc":"112","col2":1.343,"col3":2.344}]}
{"item_id":"1234","jsonData":[{"loc":"113","col2":1.353,"col3":2.354}]}
{"item_id":"1231","jsonData":[{"loc":"114","col2":5.343,"col3":6.344}]}
{"item_id":"12345","jsonData":[{"loc":"111","col2":1.34,"col3":2.34}]}
Upvotes: 0