Reputation: 323
May I know how to implement dead letter queue example for the below code? The same input record should be published to some dlq topic.
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.map(ConsumerRecord::value)
.flatMap(this::consumeWithRetry)
.onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
.retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
.subscribe();
public Mono<Void> consumeWithRetry(MessageRecord message){
return consume(message)
.retry(2);
}
public Mono<Void> consumeWithRetry(MessageRecord message){
return Mono.defer(()->consume(message))
.retry(2);
}
Upvotes: 0
Views: 257