BlueSheepToken
BlueSheepToken

Reputation: 6109

How to create a map column to count occurrences without udaf

I would like to create a Map column which counts the number of occurrences.

For instance:

+---+----+
|  b|   a|
+---+----+
|  1|   b|
|  2|null|
|  1|   a|
|  1|   a|
+---+----+

would result in

+---+--------------------+
|  b|                 res|
+---+--------------------+
|  1|[a -> 2.0, b -> 1.0]|
|  2|                  []|
+---+--------------------+

For the moment, in Spark 2.4.6, I was able to make it using udaf.

While bumping to Spark3 I was wondering if I could get rid of this udaf (I tried using the new method aggregate without success)

Is there an efficient way to do it? (For the efficiency part, I am able to test easily)

Upvotes: 3

Views: 1711

Answers (4)

Raphael Roth
Raphael Roth

Reputation: 27373

Here a Spark 3 solution:

import org.apache.spark.sql.functions._

df.groupBy($"b",$"a").count()
  .groupBy($"b")
  .agg(
    map_from_entries(
      collect_list(
        when($"a".isNotNull,struct($"a",$"count"))
      )
    ).as("res")
  )
  .show()

gives:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

Here the solution using Aggregator:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder

val countOcc = new Aggregator[String, Map[String,Int], Map[String,Int]] with Serializable {
    def zero: Map[String,Int] = Map.empty.withDefaultValue(0)
    def reduce(b: Map[String,Int], a: String) = if(a!=null) b + (a -> (b(a) + 1)) else b
    def merge(b1: Map[String,Int], b2: Map[String,Int]) = {
      val keys = b1.keys.toSet.union(b2.keys.toSet)
      keys.map{ k => (k -> (b1(k) + b2(k))) }.toMap
    }
    def finish(b: Map[String,Int]) = b
    def bufferEncoder: Encoder[Map[String,Int]] = implicitly(ExpressionEncoder[Map[String,Int]])
    def outputEncoder: Encoder[Map[String, Int]] = implicitly(ExpressionEncoder[Map[String, Int]])
}

val countOccUDAF = udaf(countOcc)

df
  .groupBy($"b")
  .agg(countOccUDAF($"a").as("res"))
  .show()

gives:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

Upvotes: 4

Raphael Roth
Raphael Roth

Reputation: 27373

You could always use collect_list with UDF, but only if you groupings are not too lage:

val udf_histo = udf((x:Seq[String]) => x.groupBy(identity).mapValues(_.size))

df.groupBy($"b")
  .agg(
    collect_list($"a").as("as")
  )
  .select($"b",udf_histo($"as").as("res"))
  .show()

gives:

+---+----------------+
|  b|             res|
+---+----------------+
|  1|[b -> 1, a -> 2]|
|  2|              []|
+---+----------------+

This should be faster than UDAF: Spark custom aggregation : collect_list+UDF vs UDAF

Upvotes: 2

werner
werner

Reputation: 14845

Here a solution with a single groupBy and a slightly complex sql expression. This solution works for Spark 2.4+

df.groupBy("b")
  .agg(expr("sort_array(collect_set(a)) as set"),
       expr("sort_array(collect_list(a)) as list"))
  .withColumn("res",
       expr("map_from_arrays(set,transform(set, x -> size(filter(list, y -> y=x))))"))
  .show()

Output:

+---+------+---------+----------------+
|  b|   set|     list|             res|
+---+------+---------+----------------+
|  1|[a, b]|[a, a, b]|[a -> 2, b -> 1]|
|  2|    []|       []|              []|
+---+------+---------+----------------+

The idea is to collect the data from column a twice: one time into a set and one time into a list. Then with the help of transform for each element of the set the number of occurences of the particular element in the list is counted. Finally, the set and the number of elements are combined with map_from_arrays.

However I cannot say if this approach is really faster than a UDAF.

Upvotes: 1

Sanket9394
Sanket9394

Reputation: 2091

We can achieve this is spark 2.4

//GET THE COUNTS
val groupedCountDf = originalDf.groupBy("b","a").count

//CREATE MAPS FOR EVERY COUNT | EMPTY MAP FOR NULL KEY
//AGGREGATE THEM AS ARRAY 

val dfWithArrayOfMaps =  groupedCountDf
.withColumn("newMap",  when($"a".isNotNull, map($"a",$"count")).otherwise(map()))
.groupBy("b").agg(collect_list($"newMap") as "multimap")

//EXPRESSION TO CONVERT ARRAY[MAP] -> MAP

val mapConcatExpr = expr("aggregate(multimap, map(), (k, v) -> map_concat(k, v))")

val finalDf = dfWithArrayOfMaps.select($"b", mapConcatExpr.as("merged_data"))

Upvotes: 1

Related Questions