Ajith Nayak
Ajith Nayak

Reputation: 21

Windowing not completing its window length

I have been trying out examples on flink windowing, and to verify the timing of window I added a timestamp to the stream event. And I found out that the duration of window was less than the window length. Also If I were to use a sliding window and modify the event, I get the modified event into the next window.

When I specify the window length, does it not wait for the window to complete? And the overlapping events between sliding windows refer to the same instance? (I'm aware that streams are immutable structures)

public class WindowDemo {

public static void main(String[] args) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

    Properties prop=PropertyLoader.loadPropertiesForConsumer("WC",0);
    FlinkKafkaConsumer09<Alarm> consumer= new FlinkKafkaConsumer09<Alarm>("topic_smartEmse", new AlarmSchema(), prop);
    DataStream<Alarm> inputStream= env.addSource(consumer);

    inputStream= inputStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {

        @Override
        public void flatMap(Alarm value, Collector<Alarm> out)
                throws Exception {
            System.out.println("flatMap Started at "+System.currentTimeMillis());
            value.setUserDefined10("IN TIME "+System.currentTimeMillis());
            out.collect(value);
            System.out.println("flatMap Ended at "+System.currentTimeMillis());
        }
    });

    KeyedStream<Alarm, String> keyedStream= inputStream.keyBy(new KeySelector<Alarm, String>(){

        @Override
        public String getKey(Alarm value) throws Exception {
            System.out.println("getKey Started at "+System.currentTimeMillis());
            return "XX";
        }});

    DataStream<Alarm> dataStream= keyedStream.timeWindow(Time.of(90, TimeUnit.SECONDS)).apply(new WindowFunction<Alarm, Alarm, String, TimeWindow>() {

        @Override
        public void apply(String key, TimeWindow window,
                Iterable<Alarm> input, Collector<Alarm> out)
                throws Exception {
            System.out.println("timeWindow Started at "+System.currentTimeMillis());
            int count=0;
            System.out.println("Key : "+key);
            System.out.println("Values : "+input);
            Iterator<Alarm> itr= input.iterator();
            while (itr.hasNext()){
                Alarm alarm= itr.next();
                alarm.setUserDefined1(""+count++);

                out.collect(alarm);
            }
            System.out.println("timeWindow ended at "+System.currentTimeMillis());

        }
    });

    dataStream= dataStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {

        @Override
        public void flatMap(Alarm value, Collector<Alarm> out)
                throws Exception {
            value.setUserDefined11("OUT TIME "+System.currentTimeMillis());
            out.collect(value);
        }
    });
    dataStream.printToErr();
    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

Upvotes: 1

Views: 89

Answers (1)

TobiSH
TobiSH

Reputation: 2921

If I got you right your concern is that the window evaluates (apply is called) before the given timeframe was finished. I noticed the same effect for the first evaluation of the window. It seems like the time slot is somehow aligned. I started the processing at 19:09:13 and the first time the window evaluated was at 19:10:30 so after 77 seconds. After this first call the window was closed not exactly but quite close to each 90 seconds.

For the TumblingProcessingTimeWindows (which you are using) It seems to be this code:

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

    private long size;

    private TumblingProcessingTimeWindows(long size) {
        this.size = size;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {

        final long now = context.getCurrentProcessingTime();
        // here goes the alignment 
        long start = now - (now % size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    }

Does this make sense to you?

Upvotes: 1

Related Questions