Reputation: 93
I'm using the @KafkaListener
annotation in my spring boot app without creating a custom KafkaListenerContainerFactory
bean. I'm currently setting the spring.kafka.consumer.value-deserializer
property in my application.yml file to assign my deserializer and prefer to do this programmatically to have compile time checks. I realize creating my own KafkaListenerContainerFactory
allows me to set this property on the factory, but I'd like to avoid the boiler plate and some extra complications around getting the SSL setup.
Is there a simple way to set my value deserializer programmatically without creating my own KafkaListenerContainerFactory
?
Upvotes: 9
Views: 12659
Reputation: 1989
According to the docs
You can set additional properties like this:
@KafkaListener(topics = "test-transactional",
properties={"foo:bar","isolation.level:read_committed"})
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("got consumer record" + cr.toString());
}
@KafkaListener(topics = "test-transactional",
properties={"isolation.level:read_uncommitted"})
public void listenDifferent(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("uncomitted: got consumer record" + cr.toString());
}
Upvotes: 4
Reputation: 174554
The deserializer goes on properties used to create the consumer factory, not the container factory, you can override boot's consumer factory as follows:
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
Upvotes: 6