BAE
BAE

Reputation: 8956

spark: how to merge rows to array of jsons

Input:

id1   id2    name   value           epid
"xxx" "yyy"  "EAN"  "5057723043"    "1299"
"xxx" "yyy"  "MPN"  "EVBD"          "1299"

I want:

{         "id1": "xxx",
          "id2": "yyy",
          "item_specifics": [
            {
              "name": "EAN",
              "value": "5057723043"
            },
            {
              "name": "MPN",
              "value": "EVBD"
            },
            {
              "name": "EPID",
              "value": "1299"
            }
          ]
}

I tried the following two solutions from How to aggregate columns into json array? and how to merge rows into column of spark dataframe as vaild json to write it in mysql:

pi_df.groupBy(col("id1"), col("id2"))
  //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
  .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))

But I got:

{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }

How to fix this? Thanks

Upvotes: 0

Views: 3451

Answers (3)

Apurba Pandey
Apurba Pandey

Reputation: 1076

For Spark < 2.4

You can create 2 dataframes, one with name and value and other with epic as name and epic value as value and union them together. Then aggregate them as collect_set and create a json. The code should look like this.

//Creating Test Data
val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
  .toDF("id1", "id2", "name", "value", "epid")

df.show(false)

+---+---+----+----------+----+
|id1|id2|name|value     |epid|
+---+---+----+----------+----+
|xxx|yyy|EAN |5057723043|1299|
|xxx|yyy|MPN |EVBD      |1299|
+---+---+----+----------+----+

val df1 = df.withColumn("map", struct(col("name"), col("value")))
  .select("id1", "id2", "map")

val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
  .select("id1", "id2", "map")

val jsonDF = df1.union(df2).groupBy("id1", "id2")
  .agg(collect_set("map").as("item_specifics"))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics")))

jsonDF.select("json").show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------+
|json                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------+

For Spark = 2.4

It provides a array_union method. It might be helpful in doing it without union. I haven't tried it though.

val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
  .withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
  .groupBy("id1", "id2")
    .agg(collect_set("map1").as("item_specifics1"),
      collect_set("map2").as("item_specifics2"))
  .withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))

Upvotes: 5

m-bhole
m-bhole

Reputation: 1189

Here is what you need to do

    import scala.util.parsing.json.JSONObject
    import scala.collection.mutable.WrappedArray

    //Define udf
    val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=> {
 //Add epid to item_specifics json
val item_withEPID = item_specifics :+ Map("epid" -> epid)

val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\\",""))).flatten.mkString("[",",","]")

 //Add id1 and id2 to output json
val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq )
JSONObject(m).toString().replace("\\","")
})

val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid")

//Add epid as part of group by column else the column will not be available after group by and aggregation
val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid"))

df.show(false)

scala> df.show(false)
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id1|id2|epid|item_specifics                                                                                                                                                      |
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|xxx|yyy|1299|{"id1" : "xxx", "id2" : "yyy", "item_specifics" : [{"name" : "MPN", "value" : "EVBD"},{"name" : "EAN", "value" : "5057723043"},{"name" : "epid", "value" : "1299"}]}|
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Content of item_specifics column/ output

{
    "id1": "xxx",
    "id2": "yyy",
    "item_specifics": [{
        "name": "MPN",
        "value": "EVBD"
    }, {
        "name": "EAN",
        "value": "5057723043"
    }, {
        "name": "epid",
        "value": "1299"
    }]
}

Upvotes: 0

ayplam
ayplam

Reputation: 1963

You're pretty close. I believe you're looking for something like this:

val pi_df2 = pi_df.withColumn("name", lit("EPID")).
withColumnRenamed("epid", "value").
select("id1", "id2", "name","value")

pi_df.select("id1", "id2", "name","value").
union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))).
groupBy(col("id1"), col("id2")).
agg(collect_list(col("item_specific")).alias("item_specifics")).
write.json(...)

The union should bring back epid into item_specifics

Upvotes: 0

Related Questions