Kendo
Kendo

Reputation: 11

Is there any work around for Flink HA restarting job from failed status without checkpoint/savepoint?

For processing of unbounded kafka streams I am using Flink app to generate aggregated data.


Flink Version: 1.19: Application Cluster Mode. [With 1 stand by Jobmanager]

With following config values   
- high-availability.type: kubernetes
- high-availability.storageDir: <s3a path value>
- state.backend.type: rocksdb
- state.backend.incremental: true
- state.checkpoints.dir: <s3a path value>
- checkpointingMode: AT_LEAST_ONCE
- ExternalizedCheckpointCleanup: RETAIN_ON_CANCELLATION

First:

I tried failure-rate restart strategy which caused to restore job after exhusting max restart attempts.

Issue was after exhusting max retry attempts job was getting into Failed status resulting into cleanup of configmap (checkpoint id to restore while restart was getting cleaned-up), And then Jobmanager was getting terminated.

But because of HA, jobmanager pod was getting re-created and which was restarting job whithout restoring from any checkpoint.

Then,

I switched restart strategy to exponential-delay. We can now restart job with last successful checkpoint for infinite(INT.MAX_VALUE) number of times.

But, there are below concerns

Our job may come accross one of following kinds of failures

  1. Code/System performance issue. OR Kafka down time.
  2. Run time exception like bad message on kafka topic.

Out of these, resarting N number of times in #1 makes sense. With restarts there is still chance. It will move to next offsets, hence checkpoints.

But, In #2 job is going to fail infinite number of times. And Infinite restarting wont be able to fix the job.

Quesions:

  1. Why does it restore from checkpoint in attempts, but restore from scratch after all attempts?
  2. Can we configure Flink to retain checkpoint when job is marked as failed.? So that when HA restarts it it will restore from last checkpoint.

Upvotes: 0

Views: 113

Answers (0)

Related Questions