Nitish Kumar
Nitish Kumar

Reputation: 51

Flink unable to restore operator state for a kafka consumer when starting from checkpoint

We have a Streaming Job that has 20 separate pipelines, with each pipeline having one/many Kafka topic sources.

We are noticing strange behavior in restoring Operatorstate from the checkpoint when we restart the job with a new jar (I have added one more pipeline) and AllowNonRestoredState=true.

Flink Version:1.13.0

Is this a known issue in Flink?

Note- In some cases, we have the same topic in two pipelines with a different group. As Per My Understanding, this should not impact any state restoration since Kafka Union state is on each Kafka consumer source for a pipeline.

Upvotes: 1

Views: 840

Answers (1)

David Anderson
David Anderson

Reputation: 43454

Since you haven't explicitly provided UIDs for the operators, you are relying on automatically generated UIDs. These are only stable so long as the job's topology remains unchanged. When you add a new pipeline, this may change some or all of the previously auto-generated UIDs, and make that state unrestorable.

If you want to ensure you will be able to restore state, set UIDs on all stateful operators. See Flink's Production Readiness Checklist for details.

If you want to explicitly set UIDs that match the currently auto-generated ones so you can safely evolve the job, you can find the hashed UIDs for each operator by inspecting the running job via the REST API (the vertexId for each operator is its hashed UID). Then you can use those hashed UIDs in combination with setUidHash() on those same operators in your code. See Matching Operator State in the Flink documentation.

Upvotes: 1

Related Questions