Reputation: 43
Here is my Flink job workflow:
DataStream<FlinkEvent> events = env.addSource( consumer ).flatMap(...).assignTimestampsAndWatermarks( new EventTsExtractor() );
DataStream<SessionStatEvent> sessionEvents = events.keyBy(
new KeySelector<FlinkEvent, Tuple2<String, String> >()
{
@Override
public Tuple2<String, String> getKey( FlinkEvent value ) throws Exception {
return(Tuple2.of( value.getF0(), value.getSessionID ) );
}
} )
.window( TumblingEventTimeWindows.of( Time.minutes( 2 ) ) )
.allowedLateness( Time.seconds( 10 ) )
.aggregate( new SessionStatAggregator(), new SessionStatProcessor() );
/* ... */
sessionEvents.addSink( esSinkBuilder.build() );
First I encountered
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
in flatMap
operator and the task keep restarting. I observed many duplicate results with different value by same key and window timestamp.
Q1: I guess the duplicates was becaused the downstream operators consume message duplicately after job restarted. Am I right?
I resubmitted the job after fixed the ExceptionInChainedOperatorException
problem. I observed duplicates in the first time window again. And after that, the job seems to worked out right (one result in one time window per key).
Q2: Where did the duplicates come from?
Upvotes: 4
Views: 1408
Reputation: 43534
... there should be one result per key for one window
This is not (entirely) correct. Because of the allowedLateness, any late events (within the period of allowed lateness) will cause late (or in other words, extra) firings of the relevant windows. With the default EventTimeTrigger (which you appear to be using), each late event causes an additional window firing, and an updated window result will be emitted.
Upvotes: 3
Reputation: 5093
This is how Flink achieves exactly-once semantics. In case of failures Flink replays events from the last successful checkpoint. It is important to note that exactly-once means affecting state once, not processing / publishing events exactly once.
Answering Q1: yes, each restart resulted in processing the same messages over and over again Answering Q2: first window after your bug fix processed these messages again; then everything went back to normal.
Upvotes: 3