laur
laur

Reputation: 590

How to set ack-mode in .properties/.yaml, when custom valueDeserializer is used?

Have a spring-boot program that's consuming data in JSON from kafka. Afaik the only way to set JsonDeserializer is to use java config, ie it can't be set in .yaml (or .properties) file, eg

@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, DeserObject> consumerFactory( final KafkaProperties properties ) {
        JsonDeserializer<DeserObject> jsonDeserializer = new JsonDeserializer<>( DeserObject.class );
        return new DefaultKafkaConsumerFactory<>(
                properties.buildConsumerProperties(),  // so consumer could still be configured in .yaml
                new StringDeserializer(),              // key deserializer
                jsonDeserializer                       // value deserializer
        );
    }

    @Bean
    public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
        ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory );
        return factory;
    }
}

Now that we're creating ListenerContainerFactory ourselves, the AckMode can't be set from the .yaml:

spring:
  kafka:
    listener:
      ack-mode: manual

because it wouldn't be picked up (as the ListernerContainerFactory is not auto-configured by boot, since we're creating it).

So the question is: is there any way to configure ackMode from yaml, or we'd have to do it from java?:

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
    ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory );
    factory.getContainerProperties().setAckMode( MANUAL );  // !! AckMode needs to be set in Java
    return factory;
}

I guess alternatively this could be also injected via KafkaProperties, but this feels hacky:

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final KafkaProperties properties, final ConsumerFactory<String, DeserObject> consumerFactory ) {
    ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory );
    factory.getContainerProperties().setAckMode( properties.getListener().getAckMode() );
    return factory;
}

Upvotes: 3

Views: 4794

Answers (1)

Gary Russell
Gary Russell

Reputation: 174759

You need to use Boot's ConcurrentKafkaListenerContainerFactoryConfigurer in order to populate the boot properties. This is the standard bean definition used when you don't provide one...

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}

Upvotes: 4

Related Questions