Reputation: 149
I have a highly parallelized aggregation with a lot of keys I am running across multiple nodes. I am then wanting to do a summary aggregation across all values similar to the code below:
val myStream = sourceStream
.keyBy( 0 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new OtherSink)
val summaryStream = myStream
.map(Row.fromOtherRow(_))
// parallelism is 1 by definition
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new RowSink)
This works fine, but I notice the node that ends up doing the windowAll() gets a tremendous amount of inbound network traffic as well as a significant spike on that node's CPU. This is obviously because all of the data is being aggregated together and the parallelism is '1'.
Are there any current or planned provisions in Flink to do more of a two tier summary aggregation that would keep all of the data on each node, pre-aggregate it before send on the results to a second tier for the final aggregation? Here is some psuedo code to what I would have hoped to find:
val myStream = sourceStream
.keyBy( 0 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new OtherSink)
val summaryStream = myStream
.map(Row.fromOtherRow(_))
// parallelism would be at the default for the env
.windowLocal(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
// parallelism is 1 by definition
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce(_ + _)
.addSink(new RowSink)
I called it 'windowLocal()', but I am sure there could be a better name. It would be non-keyed just like windowAll(). The key benefits is it would reduce the network and CPU and Memory hit windowAll() has, by distributing this across all of the nodes you are running. I currently have to allocate more resources to my nodes to accommodate this summarization.
If this can be accomplished in some other way with the current version I would love to hear about it. I already thought about using a random value for a key for the second tier, but I beleive that would result in a full rebalance of the data, so it solves my CPU and Memory issue, but not the network. I am looking for something in the same vein as rescale() where the data stays local to the task manager or the slot.
Upvotes: 3
Views: 926
Reputation: 452
Incremental Window Aggregation with FoldFunction
The following example shows how an incremental FoldFunction can be combined with a WindowFunction to extract the number of events in the window and return also the key and end time of the window.
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
Incremental Window Aggregation with ReduceFunction
The following example shows how an incremental ReduceFunction can be combined with a WindowFunction to return the smallest event in a window along with the start time of the window.
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
You want more see here
enter code here
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html
Upvotes: 1