Reputation: 97
How does one create tuple-based sliding windows in Apache Beam? This is very easy to do in Flink:
DataStream.countWindowAll(long size, long slide)
But it is unclear from the docs for Beam (or DataFlow) how to do this. Is it some combination of windows and triggers? Is it efficient?
Upvotes: 0
Views: 386
Reputation: 17913
Sliding windows are natively supported in Beam. Please see the programming guide and documentation for the SlidingWindows class.
E.g.:
PCollection<Foo> foos = ...;
PCollection<Integer> counts = foos
.apply(Window.into(
SlidingWindows.of(Duration.standardMinutes(5))
.every(Duration.standardMinutes(1))))
// Below is required instead of Count.globally() when you use
// a non-global windowing function.
.apply(Combine.globally(Count.<Foo>combineFn()).withoutDefaults());
PCollection<String> formattedCounts = counts.apply(
ParDo.of(new DoFn<Integer, String>() {
@ProcessElement
public void process(ProcessContext c, BoundedWindow w) {
c.output("Window: " + w + ", count: " + c.element());
}
}));
Triggering is a separate dimension of the problem, and it controls when the data for a particular window will be considered "complete enough" to apply the aggregation. See programming guide.
Upvotes: 0