Reputation: 165
I am trying to do custom aggregation on a structured streaming with event time windowing.
First I have tried to use #Aggregator interface (typed-UDAF) with the .agg function, something like :
val aggregatedDataset = streamDataset
.select($"id",$"time", $"eventType", $"eventValue"))
.groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))
Yet this aggregation (in reduce function) is only working on the new input element, not the whole group
So I am trying to use the GroupState function (mapGroupsWithState, flapMapGroupWithState), or even just mapGroups function (without the state) to perform my aggregation
But my groupBy operation returns RelationalGroupedDataset and I need a KeyValueGroupedDataset to use map functions. groupByKey does not work with windowing.
How can I manage to do a custom aggregation with the structured streaming and timed event?
Thanks!
Upvotes: 2
Views: 1280
Reputation: 6095
GroupState function(s) - mapGroupsWithState
, flapMapGroupWithState
, or mapGroups
(without the state) are used to perform an aggregation only when we need to operate in Update
output mode.
But if we are using Complete
output mode then we do not need GroupState functions.
So, if you change the output mode of aggregatedDataset
query to Complete
, then it will work as expected.
I hope it helps!
Upvotes: 1