shakachuk
shakachuk

Reputation: 59

How to convert datasets from CSV to JSON with nested elements?

I have the following csv file:

LID,Name,age,CID
122,David,29,ECB4
122,Frank,31,ECB4
567,David,29,ECB4
567,Daniel,35,ECB4

I want to group the data first by the CID and later by the LID and save them as json so that they have kind of that structure:

{  
    "CID": "ECB4",
    "logs":[ {
            "LID":122,
            "body":[{
                   "name":"David",
                   "age":29
                  },
                  {
                   "name":"Frank",
                   "age":31
                  }
                 ]
            },
             "LID":567,
              "body":[{
                   "name":"David",
                   "age":29
                  },
                  {
                   "name":"Daniel",
                   "age":35
                  }
                 ]

              }

         ] 
} 

I have already defined a schema and loaded the data into a dataframe:

 sparkSession.sqlContext.read.format("csv")
  .option("delimiter",",").schema(someSchema).load("...")

But I have no idea how to group the dataframe in the wanted way. The groupBy functions returns a RelationalGroupedDataset which I can not save as json. A sql query wants that I use an aggregation after grouping.

I would appreciate any help.

Upvotes: 0

Views: 1651

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74789

groupBy defines the groupings alone that you can later use to run aggregations upon. In order to have the result saved in JSON format you have to define the final action that will act on the groupings.

groupBy(col1: String, cols: String*): RelationalGroupedDataset Groups the Dataset using the specified columns, so that we can run aggregation on them.

See RelationalGroupedDataset for all the available aggregate functions.

In other words, you have to execute aggregations using RelationalGroupedDataset interface from which you can use the most generic agg operator.

agg(expr: Column, exprs: Column*): DataFrame Compute aggregates by specifying a series of aggregate columns.


If I'm not mistaken (by looking at the output JSON file), you do groupBy to collect the name and age fields per LID.

You should do the following then:

// Load your dataset
val cids = spark.read.option("header", true).csv("cids.csv")
scala> cids.show
+---+------+---+----+
|LID|  Name|age| CID|
+---+------+---+----+
|122| David| 29|ECB4|
|122| Frank| 31|ECB4|
|567| David| 29|ECB4|
|567|Daniel| 35|ECB4|
+---+------+---+----+

With the dataset you have to first struct the columns name and age as they participate in the aggregation.

val name_ages = cids.withColumn("name_age", struct("name", "age"))
scala> name_ages.show
+---+------+---+----+-----------+
|LID|  Name|age| CID|   name_age|
+---+------+---+----+-----------+
|122| David| 29|ECB4| [David,29]|
|122| Frank| 31|ECB4| [Frank,31]|
|567| David| 29|ECB4| [David,29]|
|567|Daniel| 35|ECB4|[Daniel,35]|
+---+------+---+----+-----------+

Now, it should be fairly straightforward.

val logs = name_ages.groupBy("CID", "LID")
  .agg(collect_list("name_age") as "logs") // <-- that's the missing piece in the puzzle
scala> logs.show(truncate = false)
+----+---+-------------------------+
|CID |LID|logs                     |
+----+---+-------------------------+
|ECB4|122|[[David,29], [Frank,31]] |
|ECB4|567|[[David,29], [Daniel,35]]|
+----+---+-------------------------+

Save away...(left as a home exercise :))


Hint: You may want to use struct once more.

Upvotes: 4

Related Questions