Soheil Pourbafrani
Soheil Pourbafrani

Reputation: 3427

Flink Sliding count window behavior

Suppose we have such data structure:

Tuple2<ArryaList<Long>, Integer>

The first field is an ArrayList of length one containing a timestamp and the Integer field is a number between 1 and 40 named channel. The goal is to aggregate every 400 message with the same key (channel) and apply the ReduceFunction on them (It just merge the timestamps of the 400 messages in the first field of the tuple). I set the channel field as a key for the messages and create a Count Window of 400. For example, if we have 160000 message as input, it should output 160000/400 = 400 row and the Count window works as desired. The problem is when I use Sliding Count window, my expected behavior is :

Flink creates logical windows for every channel number and applies the ReduceFunction for the first time, if the logical window's length reaches to 400, after that every 100 input data, with the same key as the logical window's key, will call the ReduceFunction for last 400 message in the window, too, So we should have:

But running Sliding Count window, it output 1600 rows which have a variable length. (I expected lengths of outputs to be just 400)

Point: Saying length I mean the size of ArrayList (The first field of the Tuple2)

How can I justify such behavior and Implement my desired Sliding Count window?

Here is the source code:

DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
                 .reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
             @Override
             public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                 t0.f0.add(t1.f0.get(0));
                 return t0;
             }
         }).writeAsText("results400").setParallelism(1);

Update: According to @DavidAnderson suggestion, I also tried creating a new Tuple in the ReduceFunstion instead of modifying t0, But it resulted in the same output.

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                         ArrayList<Long> times = t0.f0;

                         times.addAll(t1.f0);

                         return new Tuple2<>(times, t0.f1) ;
                     }

Upvotes: 1

Views: 1836

Answers (2)

Soheil Pourbafrani
Soheil Pourbafrani

Reputation: 3427

Thanks to David Anderson suggestion, modifying the ReduceFunction to the following solve the problem. We should create a new object in the ReduceFunction:

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
                         ArrayList<Long> times = new ArrayList<>();

                         times.addAll(t0.f0);
                         times.addAll(t1.f0);


                         return new Tuple2<>(times, t0.f1) ;
                     }

Notice both reduce approaches in the question are resulted in incorrect output. Now the output is like the following:

  • The first 40 channel --> length of 100
  • The second 40 channel --> length of 200
  • The third 40 channel --> length of 300
  • The remains every 40 channel --> length of 400

So the Flink Sliding Count window behavior is it calls the ReduceFunction every sliding count input message. So in the case we have 160000 input message the result number should be: 160000/100 = 1600

Upvotes: 1

David Anderson
David Anderson

Reputation: 43707

This is the implementation of countWindow

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    return window(GlobalWindows.create())
            .evictor(CountEvictor.of(size))
            .trigger(CountTrigger.of(slide));
}

which does not behave in quite the way you expect. The window is triggered every 100 elements (the slide), whether or not it contains 400 elements (the size). The size is controlling how many elements to keep, at most.

Upvotes: 2

Related Questions