Reputation: 341
I am trying to collect a dataset in JSON format
val df = spark.sql("select invn_ctl_nbr,cl_id,department from pi_prd.table1 where batch_run_dt='20190101' and batchid = '20190101001' limit 10").toJSON.rdd
The generated result is in the format of Array[String]:
Array({"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"}
{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"}
{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}
{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"})
Further, I am trying to group the data in such a way they it should provide me an the result in the below format:
Map<key, List<data>>
To give an example:
Map<AK=[{"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"},{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"}],AF=[{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"},{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}]>
I have already tried a solution provided in the link. The code I used to fetched the required result was:
import org.json4s._
import org.json4s.jackson.Serialization.read
case class cC(invn_ctl_nbr: String,cl_id: String,department: String)
val rdd_new = df.map(m => {
implicit val formats = DefaultFormats
val parsedObj = read[cC](m)
(parsedObj.srk_clm_id, m)
})
rdd_new.collect.groupBy(_._1).map(m => (m._1,m._2.map(_._2).toList))
but it's giving me below error:
org.json4s.package$MappingException: Parsed JSON values do not match with class constructor
args=
arg types=
constructor=public cC($iw,java.lang.String,java.lang.String,java.lang.String)
My mapping is correct for what I have specified in the case class and what is coming in from the rdd, not sure what exactly I am missing here. Can anybody please help in resolving this?, would be of great help. Thank you.
Upvotes: 0
Views: 629
Reputation: 23119
You can use groupBy
with struct
, to_json
and collect_list
to get the result you wanted
The dataframe df
here is equivalent as your spark.sql("select query")
val df = Seq(
("1", "AK", "Dept1"),
("2", "AF", "Dept1"),
("3", "AF", "Dept2"),
("4", "AK", "Dept3")
).toDF("invn_ctl_nbr","cl_id","department")
val result = df.groupBy($"cl_id")
.agg(to_json(collect_list(struct(df.columns.map(col(_)) : _*))))
.rdd.map(x => (x.getString(0), x.get(1))).collectAsMap()
Output (result):
Map(AF -> [{"invn_ctl_nbr":"2","cl_id":"AF","department":"Dept1"},{"invn_ctl_nbr":"3","cl_id":"AF","department":"Dept2"}], AK -> [{"invn_ctl_nbr":"1","cl_id":"AK","department":"Dept1"},{"invn_ctl_nbr":"4","cl_id":"AK","department":"Dept3"}])
Hope this helps!
Upvotes: 3