Reputation: 3427
Suppose we have such data structure:
Tuple2<ArryaList<Long>, Integer>
The first field is an ArrayList
of length one containing a timestamp and the Integer field is a number between 1 and 40 named channel
. The goal is to aggregate every 400 message with the same key (channel
) and apply the ReduceFunction
on them (It just merge the timestamps of the 400 messages in the first field of the tuple).
I set the channel
field as a key for the messages and create a Count Window of 400. For example, if we have 160000 message as input, it should output 160000/400 = 400
row and the Count window works as desired. The problem is when I use Sliding Count window, my expected behavior is :
Flink creates logical windows for every channel
number and applies the ReduceFunction
for the first time, if the logical window's length reaches to 400, after that every 100 input data, with the same key as the logical window's key, will call the ReduceFunction
for last 400 message in the window, too, So we should have:
160000 - 400 = 159600
// the first 400 input will call the reduce function for the first time159600 / 100 = 1596
// after the first 400 input, for every 100 input Flink call the reduce function for the last 400 input 1 + 1596 = 1597
// The number of outputted rowBut running Sliding Count window, it output 1600 rows which have a variable length. (I expected lengths of outputs to be just 400)
Point: Saying length I mean the size of ArrayList (The first field of the Tuple2)
How can I justify such behavior and Implement my desired Sliding Count window?
Here is the source code:
DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);
Update: According to @DavidAnderson suggestion, I also tried creating a new Tuple in the ReduceFunstion
instead of modifying t0
, But it resulted in the same output.
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
Upvotes: 1
Views: 1836
Reputation: 3427
Thanks to David Anderson suggestion, modifying the ReduceFunction
to the following solve the problem. We should create a new object in the ReduceFunction
:
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = new ArrayList<>();
times.addAll(t0.f0);
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}
Notice both reduce approaches in the question are resulted in incorrect output. Now the output is like the following:
So the Flink Sliding Count window behavior is it calls the ReduceFunction
every sliding count input message. So in the case we have 160000 input message the result number should be:
160000/100 = 1600
Upvotes: 1
Reputation: 43707
This is the implementation of countWindow
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
which does not behave in quite the way you expect. The window is triggered every 100 elements (the slide), whether or not it contains 400 elements (the size). The size is controlling how many elements to keep, at most.
Upvotes: 2