gcandal
gcandal

Reputation: 957

Why does checkpointing impact latency so much?

I'm observing that having checkpointing, while using a memory backend, causes an unexpected increase in the observed latencies.

Consider the following checkpoint:

2019-02-27 15:35:46,322 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1551281746322 for job a80597b3312f0704beed75397c371bf5.
2019-02-27 15:35:46,326 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap backend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[KeyedProcess -> Map -> Sink: Unnamed (1/1),5,Flink Task Threads] took 0 ms.
2019-02-27 15:35:46,342 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend    - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Async calls on Source: Custom Source -> Map -> Timestamps/Watermarks (1/1),5,Flink Task Threads] took 2 ms.
2019-02-27 15:35:46,346 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend    - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-14-thread-2,5,Flink Task Threads] took 3 ms.
2019-02-27 15:35:46,351 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap backend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-11-thread-2,5,Flink Task Threads] took 14 ms.
2019-02-27 15:35:46,378 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job a80597b3312f0704beed75397c371bf5 (1157653 bytes in 54 ms).

Even though the end-to-end duration was just 50ms, the response for the event injected at 15:35:46,385 only arrived at 15:35:46,905 (520ms later). Between these 2 timestamps no events were processed. Without checkpointing the latency at 99.99% is ~15ms.

Setup:

edit: this is a linear job, so I guess there's no alignment of the checkpoint barriers.

Upvotes: 2

Views: 360

Answers (1)

gcandal
gcandal

Reputation: 957

The time is being spent in the synchronous ACK'ing of messages to RabbitMQ (MessageAcknowledgingSourceBase#notifyCheckpointComplete > MultipleIdsMessageAcknowledgingSourceBase#acknowledgeIDs > RMQSource#acknowledgeSessionIDs). This could probably be made asynchronously, like the Kafka connector does.

Because my checkpoint interval is 3 minutes and I'm injecting 200 ev/s, this means that each checkpoint triggers the acknowledgement of 36k messages (200*60*3), which is taking around 500ms.

Using a smaller interval might help having more predictable latency, at the cost of an higher median latency.

Upvotes: 1

Related Questions