Karthik
Karthik

Reputation: 71

Spark GroupBy and Aggregate Strings to Produce a Map of Counts of the Strings Based on a Condition

I have a dataframe with two multiple columns, two of which are id and label as shown below.

+---+---+---+
| id| label|
+---+---+---+
|  1| "abc"|
|  1| "abc"|
|  1| "def"|
|  2| "def"|
|  2| "def"|
+---+---+---+

I want to groupBy "id" and aggregate the label column by counts (ignore null) of label in a map data structure and the expected result is as shown below:

+---+---+--+--+--+--+--+--
| id| label             |
+---+-----+----+----+----+
|  1| {"abc":2, "def":1}|
|  2| {"def":2}         |
+---+-----+----+----+----+

Is it possible to do this without using user-defined aggregate functions? I saw a similar answer here, but it doesn't aggregate based on the count of each item.

I apologize if this question is silly, I am new to both Scala and Spark.

Thanks

Upvotes: 1

Views: 1703

Answers (1)

sen
sen

Reputation: 198

Without Custom UDFs

import org.apache.spark.sql.functions.{map, collect_list}

df.groupBy("id", "label")
  .count
  .select($"id", map($"label", $"count").as("map"))
  .groupBy("id")
  .agg(collect_list("map"))
  .show(false)

+---+------------------------+                                                  
|id |collect_list(map)       |
+---+------------------------+
|1  |[[def -> 1], [abc -> 2]]|
|2  |[[def -> 2]]            |
+---+------------------------+

Using Custom UDF,

import org.apache.spark.sql.functions.udf
val customUdf = udf((seq: Seq[String]) => {
  seq.groupBy(x => x).map(x => x._1 -> x._2.size)
})

df.groupBy("id")
  .agg(collect_list("label").as("list"))
  .select($"id", customUdf($"list").as("map"))
  .show(false)

+---+--------------------+
|id |map                 |
+---+--------------------+
|1  |[abc -> 2, def -> 1]|
|2  |[def -> 2]          |
+---+--------------------+

Upvotes: 2

Related Questions