K F
K F

Reputation: 695

How to apply aggregation in groups in specified order?

I have data set like bellow:

+-------+-------------------+
|     id|                 ts|
+-------+-------------------+
|      b|2017-01-01 00:00:01|
|      b|2017-01-01 00:00:02|
|      b|2017-01-01 00:00:03|
|      b|2017-01-01 00:00:04|
|      b|2017-01-01 00:00:06|
|      b|2017-01-01 00:00:07|
|      d|2017-01-01 00:01:07|
|      d|2017-01-01 00:01:09|
|      d|2017-01-01 00:01:10|
|      d|2017-01-01 00:01:11|
|      d|2017-01-01 00:01:13|
|      d|2017-01-01 00:01:14|
+-------+-------------------+

I want to apply aggregation on timestamp with same id, and apply the agg on ts data in ascending order. What I've did is use udaf:

abstract class TsGroupAgg[OUT](f: (List[Long]) => OUT) extends 
Aggregator[Row, String, OUT] {
  def zero: String = ""
  def reduce(buffer: String, dataInGroup: Row): String =
    buffer + s";${dataInGroup.getString(1)}"

  def merge(b1: String, b2: String): String = s"$b1;$b2"

  def finish(r: String): OUT = {
    val list = r.split(";").toList
    f(list.filter(_.length > 0).map(DateUtils.getTimestamp))
  }

  def bufferEncoder: Encoder[String] = Encoders.STRING
}

and

def tsGrpCal: TypedColumn[Row, Int] =
      new TsGroupCnt(calculateGroupTs).toColumn.name("tsGrpCal")

df.groupBy("id").agg(tsGrpCal)

As you can see, I group data in dataframe by "id", and apply my own aggregator. In my aggregator, I collect all the ts data in a string, and in the final step, I convert all ts data in string to a list, sort it, and apply calculateGroupTs method on the list. In calculateGroupTs, I can apply the aggregation in asc order of ts. There's a problem, collecting all ts data in a string is not a good way, it's ugly. And when data volume is very big like 1m, it caused OOM. So, is there a way to apply aggregation method on grouped data in order?

Upvotes: 0

Views: 1087

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74679

I'm wondering why you don't use the window aggregate functions that are available in Spark SQL out of the box and that will give you the best performance?

is there a way to apply aggregation method on grouped data in order?

I think so. See below. The order is guaranteed to be the input order so sort it according to your needs and apply aggregation.

val timeseries = spark.read.option("header", true).csv("timeseries.csv")
scala> timeseries.show
+---+-------------------+
| id|                 ts|
+---+-------------------+
|  b|2017-01-01 00:00:01|
|  b|2017-01-01 00:00:02|
|  b|2017-01-01 00:00:03|
|  b|2017-01-01 00:00:04|
|  b|2017-01-01 00:00:06|
|  b|2017-01-01 00:00:07|
|  d|2017-01-01 00:01:07|
|  d|2017-01-01 00:01:09|
|  d|2017-01-01 00:01:10|
|  d|2017-01-01 00:01:11|
|  d|2017-01-01 00:01:13|
|  d|2017-01-01 00:01:14|
+---+-------------------+

val tss = timeseries.groupBy("id").agg(collect_list("ts") as "tss")
scala> tss.show(false)
+---+------------------------------------------------------------------------------------------------------------------------------+
|id |tss                                                                                                                           |
+---+------------------------------------------------------------------------------------------------------------------------------+
|d  |[2017-01-01 00:01:07, 2017-01-01 00:01:09, 2017-01-01 00:01:10, 2017-01-01 00:01:11, 2017-01-01 00:01:13, 2017-01-01 00:01:14]|
|b  |[2017-01-01 00:00:01, 2017-01-01 00:00:02, 2017-01-01 00:00:03, 2017-01-01 00:00:04, 2017-01-01 00:00:06, 2017-01-01 00:00:07]|
+---+------------------------------------------------------------------------------------------------------------------------------+

val merged = tss.select($"id", concat_ws(";", $"tss") as "merge")
scala> merged.show(false)
+---+-----------------------------------------------------------------------------------------------------------------------+
|id |merge                                                                                                                  |
+---+-----------------------------------------------------------------------------------------------------------------------+
|d  |2017-01-01 00:01:07;2017-01-01 00:01:09;2017-01-01 00:01:10;2017-01-01 00:01:11;2017-01-01 00:01:13;2017-01-01 00:01:14|
|b  |2017-01-01 00:00:01;2017-01-01 00:00:02;2017-01-01 00:00:03;2017-01-01 00:00:04;2017-01-01 00:00:06;2017-01-01 00:00:07|
+---+-----------------------------------------------------------------------------------------------------------------------+

Anything from typed API or using a custom Aggregator usually leads to poorer performance and I'm more often to claim these days that the more built-in functions you use the better performance.

Just check the physical plan.

enter image description here

I'm not saying that it's the best physical plan because of groupBy, but using custom Scala code for the use case can give even worse plan.

Upvotes: 1

Related Questions