Reputation: 236
How can I transform data like below in order to store data in ElasticSearch?
Here is a dataset of a bean that I would aggregate by product into a JSON array.
List<Bean> data = new ArrayList<Bean>();
data.add(new Bean("book","John",59));
data.add(new Bean("book","Björn",61));
data.add(new Bean("tv","Roger",36));
Dataset ds = spark.createDataFrame(data, Bean.class);
ds.show(false);
+------+-------+---------+
|amount|product|purchaser|
+------+-------+---------+
|59 |book |John |
|61 |book |Björn |
|36 |tv |Roger |
+------+-------+---------+
ds = ds.groupBy(col("product")).agg(collect_list(map(ds.col("purchaser"),ds.col("amount")).as("map")));
ds.show(false);
+-------+---------------------------------------------+
|product|collect_list(map(purchaser, amount) AS `map`)|
+-------+---------------------------------------------+
|tv |[[Roger -> 36]] |
|book |[[John -> 59], [Björn -> 61]] |
+-------+---------------------------------------------+
This is what I want to transform it into:
+-------+------------------------------------------------------------------+
|product|json |
+-------+------------------------------------------------------------------+
|tv |[{purchaser: "Roger", amount:36}] |
|book |[{purchaser: "John", amount:36}, {purchaser: "Björn", amount:61}] |
+-------+------------------------------------------------------------------+
Upvotes: 4
Views: 3515
Reputation: 236
The solution :
ds.groupBy(col("product"))
.agg(collect_list(to_json(struct(col("purchaser"), col("amount"))).alias("json")));
Upvotes: 9