Reputation: 73
I'm a bit confused about the poll() behaviour of (Spring) Kafka after/when stopping the ConcurrentMessageListenerContainer
What I want to achieve: Stop the consumer after an exception was raised (for example message could not be saved to the database), do not commit offset, restart it after a given time and start processing again from the previously failed message.
I read this article which says that the container will call the listener with the remaining records from the poll ( which means that there is no guarantee that after the failed message a further message which was processed successfully will commit the offset. This could end up in lost/skipped messages.
Is this really the case and if yes is there a solution to solve this without upgrading the newer versions? (DLQ is not a solution for my case)
What I already did:
Setting the setErrorHandler()
and setAckOnError(false)
private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps, Class keyDeserializer) {
Map<String, Object> props = new HashMap<>();
//Set common props
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
if (kafkaProps.isSslEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
return props;
public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();
ContainerProperties containerProperties = new ContainerProperties(...);
ConcurrentMessageListenerContainer<String, byte[]> container = ...
container.setConcurrency(1); //use only one container
return container;
Error Handler
public class StoppingErrorHandler implements ErrorHandler {
private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;
int consumerHaltTimeout;
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
if (concurrentMessageListenerContainer != null) {
new Timer().schedule(new TimerTask() {
public void run() {
if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
}, consumerHaltTimeout);
What I'm using:
Upvotes: 6
Views: 8358
Reputation: 174799
without upgrading the newer versions?
2.1 introduced the ContainerStoppingErrorHandler which is a ContainerAwareErrorHandler
, the remaining unconsumed messages are discarded (and will be re-fetched when the container is restarted).
With earlier versions, your listener will need to reject (fail) the remaining messages in the batch (or set max.records.per.poll=1
Upvotes: 2