James Bond
James Bond

Reputation: 25

How to implement retries on spring kafka producer?

I'd like to set up 3 retries. Here's what I'm visualizing, if the producer is done retrying 3 times and still hasn't sent the message, it should go to a local topic (dlt).

Here's what I'm working with right now. This is the configuration

@Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configure,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configure.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2)));
        return factory;
    }

This is the main application

@SpringBootApplication
@EnableKafka
@Retryable(value = Exception.class, maxAttemptsExpression = "3",
        backoff = @Backoff(delayExpression = "1000"))
public class KafkaApplication {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
        TestBean testBean = context.getBean(TestBean.class);
        while(true){
            for (int i = 0; i < 50; i++) {
                try{
                    Thread.sleep(5000);

                } catch (Exception e){
                    System.out.println("exception" + e.getMessage());
                }
                testBean.send("This is message " + i);
            }
            context.getBean(Consumer.Listener.class).latch.await(60, TimeUnit.SECONDS);
        }

    }

This is the listener

@KafkaListener(topics = AppConfiguration.topic, groupId = AppConfiguration.groupid, containerFactory = "kafkaListenerContainerFactory")
        public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            try{
                System.out.println("Successfully Received: " + message + " (partition: " + partition + ")");
                this.latch.countDown();
            } catch (Exception e){
                System.out.println("Error in sending record");
                System.out.println(e);
                e.printStackTrace();
            }

Upvotes: 0

Views: 13940

Answers (2)

dbaltor
dbaltor

Reputation: 3363

Kafka directly supports this configuration in its producers as mentioned here.

The best way to do so though is using a combination of delivery.timeout.ms, request.timeout.ms and retry.backoff.ms properties to control how many retries will happen within a given period of time, as explained in the docs.

If you are using Spring Boot, you can configure a bean as follows:

  @Bean(name = "ProducerConfig")
  public Map<String, Object> producerConfig() {
    return Map.of(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers,
        // Enable safely ordered retries
        ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
        ProducerConfig.ACKS_CONFIG, "all",
        // Config number of retries
        ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 5000,
        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 200, 
        ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500
    );

  }

Upvotes: 4

xuanzjie
xuanzjie

Reputation: 51

If you just want to retry 3 times and don't want too many components to be redundant, you can choose to use try catch to cooperate with the local Queue implementation.

The simplest is to try again immediately after an error occurs. Of course, many times you will want to retry again every few minutes. At this time, you only need to put the error message into a queue in the catch, and then take out the queue data from another method and send it again. But maybe kafka has solved this problem for you, some exceptions can be retried automatically, I suggest you to understand: https://kafka.apache.org/21/documentation.html#producerconfigs

The following is the simplest implementation:

public void sendToKafka(String topic, String key, String value, Integer retryTimes) {
        try {
            kafkaTemplate.send(topic, key, value);
        } catch (Exception e) {
            //if retryTimes
            if (retryTimes < 3) {
                sendToKafka(topic, key, value, ++retryTimes);
            }
        }
    }

Upvotes: 1

Related Questions