Reputation: 590
Have a spring-boot program that's consuming data in JSON from kafka. Afaik the only way to set JsonDeserializer is to use java config, ie it can't be set in .yaml (or .properties) file, eg
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, DeserObject> consumerFactory( final KafkaProperties properties ) {
JsonDeserializer<DeserObject> jsonDeserializer = new JsonDeserializer<>( DeserObject.class );
return new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties(), // so consumer could still be configured in .yaml
new StringDeserializer(), // key deserializer
jsonDeserializer // value deserializer
);
}
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
return factory;
}
}
Now that we're creating ListenerContainerFactory
ourselves, the AckMode
can't be set from the .yaml:
spring:
kafka:
listener:
ack-mode: manual
because it wouldn't be picked up (as the ListernerContainerFactory is not auto-configured by boot, since we're creating it).
So the question is: is there any way to configure ackMode from yaml, or we'd have to do it from java?:
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
factory.getContainerProperties().setAckMode( MANUAL ); // !! AckMode needs to be set in Java
return factory;
}
I guess alternatively this could be also injected via KafkaProperties
, but this feels hacky:
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final KafkaProperties properties, final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
factory.getContainerProperties().setAckMode( properties.getListener().getAckMode() );
return factory;
}
Upvotes: 3
Views: 4794
Reputation: 174759
You need to use Boot's ConcurrentKafkaListenerContainerFactoryConfigurer
in order to populate the boot properties. This is the standard bean definition used when you don't provide one...
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
Upvotes: 4