Reputation: 1888
Is there a way to set property ackOnError=false using spring boot application.properties file like other listener properties such as:
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-count
spring.kafka.listener.ack-time
spring.kafka.listener.poll-timeout
?
If it is not possible, how cat I combine: properties from file + java config? I don't want to set all kafka properties in java-config like this:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
......
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
I just want to override property ackOnError. Thank you in advance.
Upvotes: 4
Views: 2199
Reputation: 174554
It's not available as a property but you can override Boot's container factory @Bean
as follows...
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckOnError(false);
return factory;
}
This will apply all the other boot properties as well.
However, this setting doesn't have much utility unless you also stop the container (e.g. with a ContainerStoppingErrorHandler
).
This is because the next successful record will have its offset committed anyway, which is beyond the offset of the failed record.
That said, in 2.3, it will be false
by default.
Upvotes: 2