Reputation: 71
I have a dataframe with two multiple columns, two of which are id and label as shown below.
+---+---+---+
| id| label|
+---+---+---+
| 1| "abc"|
| 1| "abc"|
| 1| "def"|
| 2| "def"|
| 2| "def"|
+---+---+---+
I want to groupBy "id" and aggregate the label column by counts (ignore null) of label in a map data structure and the expected result is as shown below:
+---+---+--+--+--+--+--+--
| id| label |
+---+-----+----+----+----+
| 1| {"abc":2, "def":1}|
| 2| {"def":2} |
+---+-----+----+----+----+
Is it possible to do this without using user-defined aggregate functions? I saw a similar answer here, but it doesn't aggregate based on the count of each item.
I apologize if this question is silly, I am new to both Scala and Spark.
Thanks
Upvotes: 1
Views: 1703
Reputation: 198
Without Custom UDFs
import org.apache.spark.sql.functions.{map, collect_list}
df.groupBy("id", "label")
.count
.select($"id", map($"label", $"count").as("map"))
.groupBy("id")
.agg(collect_list("map"))
.show(false)
+---+------------------------+
|id |collect_list(map) |
+---+------------------------+
|1 |[[def -> 1], [abc -> 2]]|
|2 |[[def -> 2]] |
+---+------------------------+
Using Custom UDF,
import org.apache.spark.sql.functions.udf
val customUdf = udf((seq: Seq[String]) => {
seq.groupBy(x => x).map(x => x._1 -> x._2.size)
})
df.groupBy("id")
.agg(collect_list("label").as("list"))
.select($"id", customUdf($"list").as("map"))
.show(false)
+---+--------------------+
|id |map |
+---+--------------------+
|1 |[abc -> 2, def -> 1]|
|2 |[def -> 2] |
+---+--------------------+
Upvotes: 2