robynico
robynico

Reputation: 236

How to aggregate columns into a JSON array?

How can I transform data like below in order to store data in ElasticSearch?

Here is a dataset of a bean that I would aggregate by product into a JSON array.

List<Bean> data = new ArrayList<Bean>();
data.add(new Bean("book","John",59));
data.add(new Bean("book","Björn",61));
data.add(new Bean("tv","Roger",36));
Dataset ds = spark.createDataFrame(data, Bean.class);

ds.show(false);

+------+-------+---------+
|amount|product|purchaser|
+------+-------+---------+
|59    |book   |John     |
|61    |book   |Björn    |
|36    |tv     |Roger    |
+------+-------+---------+


ds = ds.groupBy(col("product")).agg(collect_list(map(ds.col("purchaser"),ds.col("amount")).as("map")));
ds.show(false);

+-------+---------------------------------------------+
|product|collect_list(map(purchaser, amount) AS `map`)|
+-------+---------------------------------------------+
|tv     |[[Roger -> 36]]                              |
|book   |[[John -> 59], [Björn -> 61]]                |
+-------+---------------------------------------------+

This is what I want to transform it into:

+-------+------------------------------------------------------------------+
|product|json                                                              |
+-------+------------------------------------------------------------------+
|tv     |[{purchaser: "Roger", amount:36}]                                 |
|book   |[{purchaser: "John", amount:36}, {purchaser: "Björn", amount:61}] |
+-------+------------------------------------------------------------------+

Upvotes: 4

Views: 3515

Answers (1)

robynico
robynico

Reputation: 236

The solution :

ds.groupBy(col("product"))
  .agg(collect_list(to_json(struct(col("purchaser"), col("amount"))).alias("json")));

Upvotes: 9

Related Questions