Hannah
Hannah

Reputation: 163

Format a Dataframe into a nested json in spark scala

I have a dataframe df_original like below enter image description here

I want to convert it into a nested json format like below

enter image description here

So far I hv done this

val df_original =data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val data1 = data.groupBy($"unique_id").agg(collect_set(struct($"acct_no",$"ciskey")).as("accounts"))
val resultDf = df_original.join(data1, Seq("unique_id")).dropDuplicates()

which generates the below json

{
  "unique_id": "12345678",
  "transaction_status": "posted",
  "amount": "116.26",
  "category": "Family",
  "email_id": "[email protected]",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": [
    {
      "acct_no": "51663",
      "ciskey": "47626220"
    },
    {
      "acct_no": "51663",
      "ciskey": "47626221"
    }, 
    {
      "acct_no": "51663",
      "ciskey": "47626222"
    }

  ]
}

Please help me to move forward

Upvotes: 0

Views: 388

Answers (2)

Som
Som

Reputation: 6323

another alternative-

Load the test data

  val data =
      """
        |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey
        |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626220
        |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626221
        |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626222
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()
    /**
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
      * +------------------+------+--------+--------------+---------+-------+--------+
      * |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626220|
      * |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626221|
      * |posted            |116.26|Family  |[email protected]|12345678 |51663  |47626222|
      * +------------------+------+--------+--------------+---------+-------+--------+
      *
      * root
      * |-- transaction_status: string (nullable = true)
      * |-- amount: double (nullable = true)
      * |-- category: string (nullable = true)
      * |-- email_id: string (nullable = true)
      * |-- unique_id: integer (nullable = true)
      * |-- acct_no: integer (nullable = true)
      * |-- ciskey: integer (nullable = true)
      */

create required json

    val groupBy = df.columns.filter(_!="ciskey")
    df.groupBy(groupBy.map(col): _*).agg(collect_list($"ciskey").as("accounts"))
      .withColumn("ciskey", element_at($"accounts", 1) )
      .withColumn("customers", expr("TRANSFORM(accounts, " +
        "x -> named_struct('ciskey_no', x, 'ciskey_val', 'IND'))"))
      .withColumn("accounts",
        struct($"acct_no", $"customers"))
      .drop("customers")
      .toJSON
      .show(false)

    /**
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |value                                                                                                                                                                                                                                                                                                                          |
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"[email protected]","unique_id":12345678,"acct_no":51663,"accounts":{"acct_no":51663,"customers":[{"ciskey_no":47626220,"ciskey_val":"IND"},{"ciskey_no":47626221,"ciskey_val":"IND"},{"ciskey_no":47626222,"ciskey_val":"IND"}]},"ciskey":47626220}|
      * +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      */

Json-

{
    "transaction_status": "posted",
    "amount": 116.26,
    "category": "Family",
    "email_id": "[email protected]",
    "unique_id": 12345678,
    "acct_no": 51663,
    "accounts": {
        "acct_no": 51663,
        "customers": [{
            "ciskey_no": 47626220,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626221,
            "ciskey_val": "IND"
        }, {
            "ciskey_no": 47626222,
            "ciskey_val": "IND"
        }]
    },
    "ciskey": 47626220
}

Upvotes: 1

s.polam
s.polam

Reputation: 10362

Check below code.

scala> df.show(false)
+------------------+------+--------+--------------+---------+-------+--------+
|transaction_status|amount|category|email_id      |unique_id|acct_no|ciskey  |
+------------------+------+--------+--------------+---------+-------+--------+
|posted            |116.26|Family  |[email protected]|12345678 |51663  |47626220|
|posted            |116.26|Family  |[email protected]|12345678 |51663  |47626221|
|posted            |116.26|Family  |[email protected]|12345678 |51663  |47626222|
+------------------+------+--------+--------------+---------+-------+--------+
scala> 

df
.groupBy($"unique_id")
.agg(
    collect_set(
        struct(
            $"transaction_status",
            $"amount",
            $"category",
            $"email_id",
            $"unique_id",
            $"acct_no"
        )).as("json_data"),
    first($"ciskey").as("ciskey"),
    first("acct_no").as("acct_no"),
    collect_list(struct($"ciskey")).as("customers")
)
.withColumn("json_data",explode($"json_data"))
.withColumn("accounts",struct($"acct_no",$"customers"))
.select($"json_data.*",$"ciskey",$"accounts")
.toJSON
.show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                      |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"transaction_status":"posted","amount":116.26,"category":"Family","email_id":"[email protected]","unique_id":"12345678","acct_no":"51663","ciskey":"47626220","accounts":{"acct_no":"51663","customers":[{"ciskey":"47626220"},{"ciskey":"47626221"},{"ciskey":"47626222"}]}}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Above code generate data like below, may be you can add logic on top of this.

{
  "transaction_status": "posted",
  "amount": 116.26,
  "category": "Family",
  "email_id": "[email protected]",
  "unique_id": "12345678",
  "acct_no": "51663",
  "ciskey": "47626220",
  "accounts": {
    "acct_no": "51663",
    "customers": [
      {
        "ciskey": "47626220"
      },
      {
        "ciskey": "47626221"
      },
      {
        "ciskey": "47626222"
      }
    ]
  }
}

Upvotes: 1

Related Questions