Mykhaylo Adamovych
Mykhaylo Adamovych

Reputation: 20966

Spring Kafka listener infinite loop on error

I have Spring Kafka listener initialized as

@Bean
public Map<String, Object> consumerConfig() {
    final HashMap<String, Object> result = new HashMap<>();
    result.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    result.put(GROUP_ID_CONFIG, groupId);
    result.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    result.put(VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJacksonRulesExecutionResultsDeserializer.class);
    return result;
}

@Bean
public ConsumerFactory<Long, MessageResult> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageResult> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Long, MessageResult> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    containerFactory.setConcurrency(KAFKA_LISTENER_THREADS_COUNT);
    containerFactory.getContainerProperties().setPollTimeout(KAFKA_LISTENER_POLL_TIMEOUT);
    containerFactory.getContainerProperties().setAckOnError(true);
    containerFactory.getContainerProperties().setAckMode(RECORD);
    return containerFactory;
}

and used as

@KafkaListener(topics = "${spring.kafka.out-topic}")
public void processSrpResults(MessageResult result) {

deserializer throws an exception during deserialization which causes infinite loop because listener can't fetch messege.

How could I make kafka listener to commit on error?

Upvotes: 9

Views: 3855

Answers (2)

Jayavardhan Gange
Jayavardhan Gange

Reputation: 305

You can use org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 as value deserialiser in consumer configuration. This gives you a chance to gracefully handle the deserialisation error and allows you to commit the consumer records as well. Usage and more details can be found at https://docs.spring.io/spring-kafka/reference/html/#error-handling-deserializer

Upvotes: 0

Jim K
Jim K

Reputation: 121

I created a subclass to the deserializer that was throwing the exception. Then I used that in my configuration as the deserializer. Your processor then has to handle null objects.

public class MyErrorHandlingDeserializer extends ExceptionThrowingDeserializer {

    @Override
    public Object deserialize(String topic, byte[] data) {
        try {
            return super.deserialize(topic, data);
        } catch (Exception e) {
            log.error("Problem deserializing data " + new String(data) + " on topic " + topic, e);
            return null;
        }
    }
}

Upvotes: 6

Related Questions