Reputation: 695
I have data set like bellow:
+-------+-------------------+
| id| ts|
+-------+-------------------+
| b|2017-01-01 00:00:01|
| b|2017-01-01 00:00:02|
| b|2017-01-01 00:00:03|
| b|2017-01-01 00:00:04|
| b|2017-01-01 00:00:06|
| b|2017-01-01 00:00:07|
| d|2017-01-01 00:01:07|
| d|2017-01-01 00:01:09|
| d|2017-01-01 00:01:10|
| d|2017-01-01 00:01:11|
| d|2017-01-01 00:01:13|
| d|2017-01-01 00:01:14|
+-------+-------------------+
I want to apply aggregation on timestamp with same id, and apply the agg on ts data in ascending order. What I've did is use udaf:
abstract class TsGroupAgg[OUT](f: (List[Long]) => OUT) extends
Aggregator[Row, String, OUT] {
def zero: String = ""
def reduce(buffer: String, dataInGroup: Row): String =
buffer + s";${dataInGroup.getString(1)}"
def merge(b1: String, b2: String): String = s"$b1;$b2"
def finish(r: String): OUT = {
val list = r.split(";").toList
f(list.filter(_.length > 0).map(DateUtils.getTimestamp))
}
def bufferEncoder: Encoder[String] = Encoders.STRING
}
and
def tsGrpCal: TypedColumn[Row, Int] =
new TsGroupCnt(calculateGroupTs).toColumn.name("tsGrpCal")
df.groupBy("id").agg(tsGrpCal)
As you can see, I group data in dataframe by "id", and apply my own aggregator. In my aggregator, I collect all the ts data in a string, and in the final step, I convert all ts data in string to a list, sort it, and apply calculateGroupTs
method on the list. In calculateGroupTs
, I can apply the aggregation in asc order of ts.
There's a problem, collecting all ts data in a string is not a good way, it's ugly. And when data volume is very big like 1m, it caused OOM.
So, is there a way to apply aggregation method on grouped data in order?
Upvotes: 0
Views: 1087
Reputation: 74679
I'm wondering why you don't use the window aggregate functions that are available in Spark SQL out of the box and that will give you the best performance?
is there a way to apply aggregation method on grouped data in order?
I think so. See below. The order is guaranteed to be the input order so sort it according to your needs and apply aggregation.
val timeseries = spark.read.option("header", true).csv("timeseries.csv")
scala> timeseries.show
+---+-------------------+
| id| ts|
+---+-------------------+
| b|2017-01-01 00:00:01|
| b|2017-01-01 00:00:02|
| b|2017-01-01 00:00:03|
| b|2017-01-01 00:00:04|
| b|2017-01-01 00:00:06|
| b|2017-01-01 00:00:07|
| d|2017-01-01 00:01:07|
| d|2017-01-01 00:01:09|
| d|2017-01-01 00:01:10|
| d|2017-01-01 00:01:11|
| d|2017-01-01 00:01:13|
| d|2017-01-01 00:01:14|
+---+-------------------+
val tss = timeseries.groupBy("id").agg(collect_list("ts") as "tss")
scala> tss.show(false)
+---+------------------------------------------------------------------------------------------------------------------------------+
|id |tss |
+---+------------------------------------------------------------------------------------------------------------------------------+
|d |[2017-01-01 00:01:07, 2017-01-01 00:01:09, 2017-01-01 00:01:10, 2017-01-01 00:01:11, 2017-01-01 00:01:13, 2017-01-01 00:01:14]|
|b |[2017-01-01 00:00:01, 2017-01-01 00:00:02, 2017-01-01 00:00:03, 2017-01-01 00:00:04, 2017-01-01 00:00:06, 2017-01-01 00:00:07]|
+---+------------------------------------------------------------------------------------------------------------------------------+
val merged = tss.select($"id", concat_ws(";", $"tss") as "merge")
scala> merged.show(false)
+---+-----------------------------------------------------------------------------------------------------------------------+
|id |merge |
+---+-----------------------------------------------------------------------------------------------------------------------+
|d |2017-01-01 00:01:07;2017-01-01 00:01:09;2017-01-01 00:01:10;2017-01-01 00:01:11;2017-01-01 00:01:13;2017-01-01 00:01:14|
|b |2017-01-01 00:00:01;2017-01-01 00:00:02;2017-01-01 00:00:03;2017-01-01 00:00:04;2017-01-01 00:00:06;2017-01-01 00:00:07|
+---+-----------------------------------------------------------------------------------------------------------------------+
Anything from typed API or using a custom Aggregator
usually leads to poorer performance and I'm more often to claim these days that the more built-in functions you use the better performance.
Just check the physical plan.
I'm not saying that it's the best physical plan because of groupBy
, but using custom Scala code for the use case can give even worse plan.
Upvotes: 1