alexanoid
alexanoid

Reputation: 25770

Spring Kafka and exactly once delivery guarantee

I use Spring Kafka and Spring Boot and just wondering how to configure my consumer, for example:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

to use the exactly once delivery guarantee?

Should I only add org.springframework.transaction.annotation.Transactional annotation over sendPost method and that's it or do I need to perform some extra steps in order to achieve this?

UPDATED

This is my current config

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTransactionManager<Object, Object> transactionManager) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        //factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }


    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Post> postProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Post> postKafkaTemplate() {
        return new KafkaTemplate<>(postProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

    @Bean
    public ProducerFactory<String, Message> messageProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Message> messageKafkaTemplate() {
        return new KafkaTemplate<>(messageProducerFactory());
    }

but it fails with the following error:

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 0 of method kafkaTransactionManager in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration required a single bean, but 3 were found:
    - postProducerFactory: defined by method 'postProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
    - updateProducerFactory: defined by method 'updateProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
    - messageProducerFactory: defined by method 'messageProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]

What am I doing wrong ?

Upvotes: 7

Views: 5099

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

You should not use manual acknowledgments. Instead, inject a KafkaTransactionManager into the listener container and the container will send the offset to the transaction when the listener method exits normally (or rollback otherwise).

You should not do acks via the consumer for exactly once.

EDIT

application.yml

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        isolation:
          level: read_committed
    producer:
      transaction-id-prefix: myTrans.

App

@SpringBootApplication
public class So52570118Application {

    public static void main(String[] args) {
        SpringApplication.run(So52570118Application.class, args);
    }

    @Bean // override boot's auto-config to add txm
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTransactionManager<Object, Object> transactionManager) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setTransactionManager(transactionManager);
        return factory;
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @KafkaListener(id = "so52570118", topics = "so52570118")
    public void listen(String in) throws Exception {
        System.out.println(in);
        Thread.sleep(5_000);
        this.template.send("so52570118out", in.toUpperCase());
        System.out.println("sent");
    }

    @KafkaListener(id = "so52570118out", topics = "so52570118out")
    public void listenOut(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> this.template.executeInTransaction(t -> t.send("so52570118", "foo"));
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so52570118", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so52570118out", 1, (short) 1);
    }

}

Upvotes: 3

Related Questions