Reputation: 25
I'd like to set up 3 retries. Here's what I'm visualizing, if the producer is done retrying 3 times and still hasn't sent the message, it should go to a local topic (dlt).
Here's what I'm working with right now. This is the configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configure,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configure.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2)));
return factory;
}
This is the main application
@SpringBootApplication
@EnableKafka
@Retryable(value = Exception.class, maxAttemptsExpression = "3",
backoff = @Backoff(delayExpression = "1000"))
public class KafkaApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
TestBean testBean = context.getBean(TestBean.class);
while(true){
for (int i = 0; i < 50; i++) {
try{
Thread.sleep(5000);
} catch (Exception e){
System.out.println("exception" + e.getMessage());
}
testBean.send("This is message " + i);
}
context.getBean(Consumer.Listener.class).latch.await(60, TimeUnit.SECONDS);
}
}
This is the listener
@KafkaListener(topics = AppConfiguration.topic, groupId = AppConfiguration.groupid, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
try{
System.out.println("Successfully Received: " + message + " (partition: " + partition + ")");
this.latch.countDown();
} catch (Exception e){
System.out.println("Error in sending record");
System.out.println(e);
e.printStackTrace();
}
Upvotes: 0
Views: 13940
Reputation: 3363
Kafka directly supports this configuration in its producers as mentioned here.
The best way to do so though is using a combination of delivery.timeout.ms, request.timeout.ms and retry.backoff.ms properties to control how many retries will happen within a given period of time, as explained in the docs.
If you are using Spring Boot, you can configure a bean as follows:
@Bean(name = "ProducerConfig")
public Map<String, Object> producerConfig() {
return Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers,
// Enable safely ordered retries
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
ProducerConfig.ACKS_CONFIG, "all",
// Config number of retries
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 5000,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 200,
ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500
);
}
Upvotes: 4
Reputation: 51
If you just want to retry 3 times and don't want too many components to be redundant, you can choose to use try catch to cooperate with the local Queue implementation.
The simplest is to try again immediately after an error occurs. Of course, many times you will want to retry again every few minutes. At this time, you only need to put the error message into a queue in the catch, and then take out the queue data from another method and send it again. But maybe kafka has solved this problem for you, some exceptions can be retried automatically, I suggest you to understand: https://kafka.apache.org/21/documentation.html#producerconfigs
The following is the simplest implementation:
public void sendToKafka(String topic, String key, String value, Integer retryTimes) {
try {
kafkaTemplate.send(topic, key, value);
} catch (Exception e) {
//if retryTimes
if (retryTimes < 3) {
sendToKafka(topic, key, value, ++retryTimes);
}
}
}
Upvotes: 1