Eldinea
Eldinea

Reputation: 165

Spark Structured Streaming - Custom aggregation with window time event

I am trying to do custom aggregation on a structured streaming with event time windowing.
First I have tried to use #Aggregator interface (typed-UDAF) with the .agg function, something like :

val aggregatedDataset = streamDataset
  .select($"id",$"time", $"eventType", $"eventValue"))
  .groupBy(window($"time", "1 hour"), $"id").agg(CustomAggregator("time","eventType","eventValue").toColumn.as("aggregation"))

Yet this aggregation (in reduce function) is only working on the new input element, not the whole group

So I am trying to use the GroupState function (mapGroupsWithState, flapMapGroupWithState), or even just mapGroups function (without the state) to perform my aggregation

But my groupBy operation returns RelationalGroupedDataset and I need a KeyValueGroupedDataset to use map functions. groupByKey does not work with windowing.

How can I manage to do a custom aggregation with the structured streaming and timed event?

Thanks!

Upvotes: 2

Views: 1280

Answers (1)

himanshuIIITian
himanshuIIITian

Reputation: 6095

GroupState function(s) - mapGroupsWithState, flapMapGroupWithState, or mapGroups (without the state) are used to perform an aggregation only when we need to operate in Update output mode.

But if we are using Complete output mode then we do not need GroupState functions.

So, if you change the output mode of aggregatedDataset query to Complete, then it will work as expected.

I hope it helps!

Upvotes: 1

Related Questions