Reputation: 957
I'm observing that having checkpointing, while using a memory backend, causes an unexpected increase in the observed latencies.
Consider the following checkpoint:
2019-02-27 15:35:46,322 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 @ 1551281746322 for job a80597b3312f0704beed75397c371bf5.
2019-02-27 15:35:46,326 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[KeyedProcess -> Map -> Sink: Unnamed (1/1),5,Flink Task Threads] took 0 ms.
2019-02-27 15:35:46,342 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Async calls on Source: Custom Source -> Map -> Timestamps/Watermarks (1/1),5,Flink Task Threads] took 2 ms.
2019-02-27 15:35:46,346 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-14-thread-2,5,Flink Task Threads] took 3 ms.
2019-02-27 15:35:46,351 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-11-thread-2,5,Flink Task Threads] took 14 ms.
2019-02-27 15:35:46,378 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job a80597b3312f0704beed75397c371bf5 (1157653 bytes in 54 ms).
Even though the end-to-end duration was just 50ms, the response for the event injected at 15:35:46,385
only arrived at 15:35:46,905
(520ms later). Between these 2 timestamps no events were processed. Without checkpointing the latency at 99.99% is ~15ms.
Setup:
System.nanoTime
difference between injection and receiving the responseedit: this is a linear job, so I guess there's no alignment of the checkpoint barriers.
Upvotes: 2
Views: 360
Reputation: 957
The time is being spent in the synchronous ACK'ing of messages to RabbitMQ (MessageAcknowledgingSourceBase#notifyCheckpointComplete
> MultipleIdsMessageAcknowledgingSourceBase#acknowledgeIDs
> RMQSource#acknowledgeSessionIDs
). This could probably be made asynchronously, like the Kafka connector does.
Because my checkpoint interval is 3 minutes and I'm injecting 200 ev/s, this means that each checkpoint triggers the acknowledgement of 36k messages (200*60*3), which is taking around 500ms.
Using a smaller interval might help having more predictable latency, at the cost of an higher median latency.
Upvotes: 1