Reputation: 87
I have a flink app (flink version is 1.9.2) which enabled checkpoint function. When I run it in the apache flink platform. I always get the checkpoint failed message: Checkpoint expired before completing.After check the threadDumps of the taskManager during a checkpoint, I found that a thread which contains two operators that request external service was always in runnable state. Below are my design of this operator and the checkpoint configuration. Please help advise how to resolve the issue ?
operator design:
public class OperatorA extends RichMapFunction<POJOA, POJOA> {
private Connection connection;
private String getCusipSourceIdPairsQuery;
private String getCusipListQuery;
private MapState<String, List<POJOX>> modifiedCusipState;
private MapState<String, List<POJOX>> bwicMatchedModifiedCusipState;
@Override
public POJOA map(POJOA value) throw Exception {
// create local variable PreparedStatement every time invoke this map method
// update/clear those two MapStates
}
@Override
public void open(Configuration parameters) {
// initialize jdbc connection and TTL MapStates using GlobalJobParameters
}
@Override
public void close() {
// close jdbc connection
}
}
public class OperatorB extends RichMapFunction<POJOA, POJOA> {
private MyServiceA serviceA;
private MyServiceB serviceB;
@Override
public POJOA map(POJOA value) throw Exception {
// call a restful GET API of ServiceB, get a XML response, about 500 fields in the response.
// use serviceA's function to extract the XML document and then populate the value fields.
}
@Override
public void open(Configuration parameters) {
// initialize local jdbc connection and PreparedStatement using globalJobParameters. then use the executed results to initialize serviceA.
// initialize serviceB.
}
}
checkpoint configuration:
Checkpointing Mode Exactly Once
Interval 15m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 5m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Disabled
Sample checkpoint history:
ID Status Acknowledged Trigger Time Latest Acknowledgement End to End Duration State Size Buffered During Alignment
20 In Progress 3/12 (25%) 15:03:13 15:04:14 1m 1s 5.65 KB 0 B
19 Failed 3/12 14:48:13 14:50:12 10m 0s 5.65 KB 0 B
18 Failed 3/12 14:33:13 14:34:50 10m 0s 5.65 KB 0 B
17 Failed 4/12 14:18:13 14:27:04 9m 59s 2.91 MB 64.0 KB
16 Failed 3/12 14:03:13 14:05:18 10m 0s 5.65 KB 0 B
Upvotes: 3
Views: 2269
Reputation: 3922
I recently encountered a similar problem. Suggestions provided by @David Anderson are really good! Nevertheless, I have a few things to add.
You can try to tune your checkpoints according to Apache Flink documentation.
In my case, checkpoint interval was lower than min pause between checkpoints, so I increased it to make it bigger. In my case, I multiplied checkpoint interval by 2 and set this value as min pause between checkpoints.
You can also try to increase checkpoint timeout.
Another issue may be ValueState
. My pipleline was keeping state for a long period of time and it wasn't evicted what was causing thoroughput problems. I set TTL for the ValueState
(in my case for 30 minutes) and it started to work better. TTL is well described in the Apache Flink documentation. It's really simple and looks like that:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
It's also worth noticing that this SO thread is related to the similar topic: Flink Checkpoint Failure - Checkpoints time out after 10 mins and tips provided there may be useful.
Regards,
Piotr
Upvotes: 1
Reputation: 43697
Doing any sort of blocking i/o in a Flink user function (e.g., a RichMap or ProcessFunction) is asking for trouble with checkpointing. The reason is that it is very easy to end up with significant backpressure, which then prevents the checkpoint barriers from making sufficiently rapid progress through the execution graph, leading to checkpoint timeouts.
The preferred way to improve on this would be to use async i/o rather than a RichMap. This will allow for there to be more outstanding requests at any given moment (assuming the external service is capable of handling the higher load), and won't leave the operator blocked in user code waiting for responses to synchronous requests -- thereby allowing checkpoints to progress unimpeded.
An alternative would be to increase the parallelism of your cluster, which should reduce the backpressure, but at the expense of tying up more computing resources that won't really doing much other than waiting.
In the worst case, where the external service simply isn't capable of keeping up with your throughput requirements, then backpressure is unavoidable. This then is going to be more difficult to manage, but unaligned checkpoints, coming in Flink 1.11, should help.
Upvotes: 2
Reputation: 1009
Here are some tips that I usually use during locating expired checkpoint problem:
Upvotes: 1