Reputation: 59
I have the following csv file:
LID,Name,age,CID
122,David,29,ECB4
122,Frank,31,ECB4
567,David,29,ECB4
567,Daniel,35,ECB4
I want to group the data first by the CID and later by the LID and save them as json so that they have kind of that structure:
{
"CID": "ECB4",
"logs":[ {
"LID":122,
"body":[{
"name":"David",
"age":29
},
{
"name":"Frank",
"age":31
}
]
},
"LID":567,
"body":[{
"name":"David",
"age":29
},
{
"name":"Daniel",
"age":35
}
]
}
]
}
I have already defined a schema and loaded the data into a dataframe:
sparkSession.sqlContext.read.format("csv")
.option("delimiter",",").schema(someSchema).load("...")
But I have no idea how to group the dataframe in the wanted way. The groupBy functions returns a RelationalGroupedDataset which I can not save as json. A sql query wants that I use an aggregation after grouping.
I would appreciate any help.
Upvotes: 0
Views: 1651
Reputation: 74789
groupBy
defines the groupings alone that you can later use to run aggregations upon. In order to have the result saved in JSON format you have to define the final action that will act on the groupings.
groupBy(col1: String, cols: String*): RelationalGroupedDataset Groups the Dataset using the specified columns, so that we can run aggregation on them.
See
RelationalGroupedDataset
for all the available aggregate functions.
In other words, you have to execute aggregations using RelationalGroupedDataset interface from which you can use the most generic agg
operator.
agg(expr: Column, exprs: Column*): DataFrame Compute aggregates by specifying a series of aggregate columns.
If I'm not mistaken (by looking at the output JSON file), you do groupBy
to collect the name and age fields per LID.
You should do the following then:
// Load your dataset
val cids = spark.read.option("header", true).csv("cids.csv")
scala> cids.show
+---+------+---+----+
|LID| Name|age| CID|
+---+------+---+----+
|122| David| 29|ECB4|
|122| Frank| 31|ECB4|
|567| David| 29|ECB4|
|567|Daniel| 35|ECB4|
+---+------+---+----+
With the dataset you have to first struct
the columns name
and age
as they participate in the aggregation.
val name_ages = cids.withColumn("name_age", struct("name", "age"))
scala> name_ages.show
+---+------+---+----+-----------+
|LID| Name|age| CID| name_age|
+---+------+---+----+-----------+
|122| David| 29|ECB4| [David,29]|
|122| Frank| 31|ECB4| [Frank,31]|
|567| David| 29|ECB4| [David,29]|
|567|Daniel| 35|ECB4|[Daniel,35]|
+---+------+---+----+-----------+
Now, it should be fairly straightforward.
val logs = name_ages.groupBy("CID", "LID")
.agg(collect_list("name_age") as "logs") // <-- that's the missing piece in the puzzle
scala> logs.show(truncate = false)
+----+---+-------------------------+
|CID |LID|logs |
+----+---+-------------------------+
|ECB4|122|[[David,29], [Frank,31]] |
|ECB4|567|[[David,29], [Daniel,35]]|
+----+---+-------------------------+
Save away...(left as a home exercise :))
Hint: You may want to use struct
once more.
Upvotes: 4