Reputation: 15
Case
ReplyingKafkaTemplate
instances.ConcurrentMessageListenerContainer
created using @KafkaListener
and @SendTo
annotations on a method.ContainerStoppingErrorHandler
.Current Behavior
When the server comes back up it continues processing old requests which would have timed out.
Desired Behavior
Instead, it would be better to continue with the last message; thereby skipping past even unprocessed messages as corresponding requests would timeout and retry.
Questions
Upvotes: 0
Views: 782
Reputation: 121560
Your @KafkaListener
class must extends AbstractConsumerSeekAware
and do something like this:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
super.onPartitionsAssigned(assignments, callback);
callback.seekToEnd(assignments.keySet());
}
So, every time when your consumer joins the group it is going to seek all the assigned partitions to the end skipping all the old records.
Upvotes: 2