Rechard Lin
Rechard Lin

Reputation: 43

Flink keyedstream generate duplicate results with same key and window timestamp

Here is my Flink job workflow:

DataStream<FlinkEvent> events = env.addSource( consumer ).flatMap(...).assignTimestampsAndWatermarks( new EventTsExtractor() );
DataStream<SessionStatEvent> sessionEvents = events.keyBy( 
    new KeySelector<FlinkEvent, Tuple2<String, String> >()
            {
                @Override
                public Tuple2<String, String> getKey( FlinkEvent value ) throws Exception {
                    return(Tuple2.of( value.getF0(), value.getSessionID ) );
                }
            } )
      .window( TumblingEventTimeWindows.of( Time.minutes( 2 ) ) )
      .allowedLateness( Time.seconds( 10 ) )
      .aggregate( new SessionStatAggregator(), new SessionStatProcessor() );
/* ... */
sessionEvents.addSink( esSinkBuilder.build() );

First I encountered

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

in flatMap operator and the task keep restarting. I observed many duplicate results with different value by same key and window timestamp.

Q1: I guess the duplicates was becaused the downstream operators consume message duplicately after job restarted. Am I right? I resubmitted the job after fixed the ExceptionInChainedOperatorException problem. I observed duplicates in the first time window again. And after that, the job seems to worked out right (one result in one time window per key).

Q2: Where did the duplicates come from?

Upvotes: 4

Views: 1408

Answers (2)

David Anderson
David Anderson

Reputation: 43534

... there should be one result per key for one window

This is not (entirely) correct. Because of the allowedLateness, any late events (within the period of allowed lateness) will cause late (or in other words, extra) firings of the relevant windows. With the default EventTimeTrigger (which you appear to be using), each late event causes an additional window firing, and an updated window result will be emitted.

Upvotes: 3

bottaio
bottaio

Reputation: 5093

This is how Flink achieves exactly-once semantics. In case of failures Flink replays events from the last successful checkpoint. It is important to note that exactly-once means affecting state once, not processing / publishing events exactly once.

Answering Q1: yes, each restart resulted in processing the same messages over and over again Answering Q2: first window after your bug fix processed these messages again; then everything went back to normal.

Upvotes: 3

Related Questions