GirishB
GirishB

Reputation: 144

Apache Flink: How to close a fix size window when data is not received for certain period of time

I am trying to calculate the rate of incoming events per minute from a Kafka topic based on event time. I am using TumblingEventTimeWindows of 1 minute for this. The code snippet is given below. I have observed that if I am not receiving any event for a particular window, e.g. from 2.34 to 2.35, then the previous window of 2.33 to 2.34 does not get close. I understand the risk of losing data for the window of 2.33 to 2.34 (may happen due to system failure, bigger Kafka lag, etc.), but I cannot wait indefinitely. I need to close this window after waiting for a certain period of time, and subsequent windows can continue after the system recovers. How can I achieve this?

I am trying the following code which is giving the event count per minute for continuous flow of events.

    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    kafkaDataStream
            .flatMap(new EventFlatter())
            .filter(Objects::nonNull)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
            .aggregate(new EventCountAggregator())
            .addSink(eventRateProducer);

Upvotes: 1

Views: 1126

Answers (2)

Saurabh Gangamwar
Saurabh Gangamwar

Reputation: 802

Based on @DavidAnderson suggestions.I have implemented a custom watermark strategy to advance the watermark after detecting idleness.

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.util.Preconditions;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;

public class CustomBoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    private long maxTimestamp;
    private final long outOfOrdernessMillis;
    private long lastMaxTimestampCalculatedAt;
    private long sourceMaxIdlenessMillis;

    private long lastEmittedWatermark = Long.MIN_VALUE;

    public CustomBoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness, Duration sourceIdleness) {
        Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        Preconditions.checkNotNull(sourceIdleness, "sourceIdleness");
        Preconditions.checkArgument(!sourceIdleness.isNegative(), "sourceIdleness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.sourceMaxIdlenessMillis = sourceIdleness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + this.outOfOrdernessMillis + 1L;
    }

    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        if(eventTimestamp > maxTimestamp) {
            maxTimestamp = eventTimestamp;
        }
        lastMaxTimestampCalculatedAt = System.currentTimeMillis();
    }

    public void onPeriodicEmit(WatermarkOutput output) {
        long potentialWM = maxTimestamp - outOfOrdernessMillis - 1L;

        if(potentialWM > lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        } else if ((System.currentTimeMillis() - lastMaxTimestampCalculatedAt) > sourceMaxIdlenessMillis) {
            // Truncate maxTimestamp till seconds & trail watermark by outOfOrdernessMillis & extra 1milliSecond to close the windows
            potentialWM = ((maxTimestamp / 10000) * 10000) + outOfOrdernessMillis + 1L;
            lastEmittedWatermark = Math.max(lastEmittedWatermark,  potentialWM);
            System.out.println("onPeriodicEmit::lastEmittedWatermark:trailed "+ Instant.ofEpochMilli(lastEmittedWatermark).atZone(ZoneOffset.UTC));
        }
        output.emitWatermark(new Watermark(this.lastEmittedWatermark));
    }
}

usage:

WatermarkStrategy<MyEvent> wm2Strategy = (ctx -> new CustomBoundedOutOfOrdernessWatermarks<MyEvent>(Duration.ofSeconds(10), Duration.ofSeconds(20)));
wm2Strategy = wm2Strategy
            .withTimestampAssigner((event, timestamp) -> event.getEventTimestampMillis())
            .withIdleness(Duration.ofSeconds(20));


DataStream<MyEvent> eventsDataStream = env.fromSource(kafkaSource, wm2Strategy,"source");

Upvotes: 0

David Anderson
David Anderson

Reputation: 43499

Given forBoundedOutOfOrderness(Duration.ofSeconds(2)), a window for the interval [t, t + 1 minute) won't close until after an event with timestamp >= t + 1 minute + 2 seconds is processed.

If your input stream can have long periods of idleness, and you can't wait until the stream resumes, then you'll have to either artificially advance the watermark after detecting idleness, or use a custom window Trigger that uses a combination of both event-time and processing-time timers.

For a watermark generator that detects idleness, here's an example, but it hasn't been updated to the new WatermarkStrategy API.

Upvotes: 2

Related Questions