John
John

Reputation: 11921

How does Flink treat checkpoints and state within IterativeStream?

I can see in the documentation that:

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

Does this refer to iterations within batch jobs or to iterative streams, or both?

If it refers to iterative streams, what state of the following operators would be available in the event of failure? (Example taken from this conversation about sharing state across operators using a ConnectedIterativeStreams and terminating the iteration with .closeWith(stream.broadcast())).

DataStream<Point> input = ...
ConnectedIterativeStreams<Point, Centroids> inputsAndCentroids = input.iterate().withFeedbackType(Centroids.class)
DataStream<Centroids> updatedCentroids = inputsAndCentroids.flatMap(new MyCoFlatmap())
inputsAndCentroids.closeWith(updatedCentroids.broadcast())

class MyCoFlatmap implements CoFlatMapFunction<Point, Centroid, Centroid>{...}

Would there be any change if MyCoFlatmap were to be a CoProcessFunction instead of a CoFlatMapFunction (meaning it could hold state too)?

Upvotes: 2

Views: 752

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

The limitation only applies to Flink's DataStream/Streaming API when using iterations. When using the DataSet/Batch API, there are no limitations.

When using streaming iterations you actually don't lose operator state but you might lose records which have been sent from an operator back to the iteration head via the loop edge. In your example, records sent from the updatedCentroids to the inputsAndCentroids might be lost in case of a failure. Hence, Flink cannot guarantee exactly once processing guarantees in this case.

There is actually a Flink improvement proposal which addresses this shortcoming. However, it has not been finished yet.

Upvotes: 2

Related Questions