phdezann
phdezann

Reputation: 33

How to recover from exceptions sent by producer.send() in Spring Cloud Stream

We experienced the following scenario :

By default, MessageChannel.send() returns true and doesn't throw any exception, even if, eventually, the KafkaProducer can't send successfully the message. We observe, about 30 seconds after this call, the following message in the logs : Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

In our case, this is not acceptable as we have to be sure that all messages are eventually delivered to Kafka, at the moment of the return of the call to MessageChannel.send().

We turned on spring.cloud.stream.kafka.bindings.<channelName>.producer.sync to true which does exactly as the documentation describes. It blocks the caller for the producer's acknowledgment of the success or the failure of the delivery (MessageTimeoutException, InterruptedException, ExecutionException), all of this controlled by KafkaProducerMessageHandler. It seems to be the best approach for us as the performance impact is negligible in our case.

But, do we need to take care of the retry ourselves if an exception is thrown ? (in our client code with @Retryable for instance)

Here is a simple project to experiment : https://github.com/phdezann/spring-cloud-bus-kafka-helloworld

Upvotes: 2

Views: 1574

Answers (1)

Gary Russell
Gary Russell

Reputation: 174514

If the send() is performed on the @StreamListener thread and the exception is thrown back to the binder, the binder retry configuration will perform retries.

However, since you are doing the send on an HTTP thread you will need to do your own retry (call send within the scope of a RetryTemplate()) or make the controller method @Retryable.

Upvotes: 1

Related Questions