Reputation: 77
My usecase is to receive Kafka messages, make multiple attempts at a rest call and upon exhaustion dump the failed mesage onto Kafka DLQ topic.
@StreamListener(EventSource.SOME_CHANNEL)
public void processMessage(Message<?> unsolicitedMessage) {
String aString = .....
oneService.act(aString);
}
@Retryable
is working perfectly in terms of handing logic for multiple attempts.
@Retryable(value = {OneException.class, TwoException}, maxAttempts = 3,
backoff = @Backoff(delay = 1000))
public boolean act(String message, String endPoint) {
//do stuff
}
For the Spring Cloud Stream inbuilt Kafka DLQ publishing to kick in (enableDlq: true
) , the exception needs to bubble-up to the @StreamListener
annotated method for the Kafka binder to do the needful.
However, in doing so, I am not able to leverage the @Recover
annotated method, where the flow perfectly lands after retrying:
@Recover
public boolean recoverOnToDLQ(OneException ex, String message, String
endPoint) {
throw ex; //Required for StreamListener Kafka DLQ to kick in!
}
Question: Is there a way to trigger Kakfa DLQ publishing from within the @Recover
method without re-throwing the exception?
Because if I use it only to rethrow, I believe I won't be making efficient use of the tighter control obtained therein. This will also simplify unit test cases, and better capture the logic at code-level? Are there any thoughts on how to handle this better?
I am on all the latest versions for spring-cloud, spring-cloud-stream and spring-retry as of this date.
Upvotes: 0
Views: 2703
Reputation: 174759
It can be done, but the question is "why would you want to?".
The binder has retry built in; simply throw the exception back to the binder and it will send the data to the DLQ after retries are exhausted. Retry is configured using binder consumer retry properties. You would not need one line of additional code.
By using @Retryable
, you are nesting 2 RetryTemplate
s (unless you are disabling the binder retry by setting the consumer maxAttempts
property to 1).
Of course, you can configure your own DLQ destination and simply write whatever you want to that in your recoverer. But, to use the binder's built-in DLQ publisher you'd have to craft a special ErrorMessage
(with the properties the publisher needs) and send it to the binding's error channel. The publisher needs the raw kafka ConsumerRecord
which you would have to re-create since it's not available to the listener.
All-in-all, for your use case, it seems much simpler to just use the binder's retry config.
EDIT
In 2.0.x you can add a RetryTemplate @Bean
to your application and it will be used for all consumer bindings (overriding the binding properties).
This template can be customized with any retry policy, retryable exceptions, etc, etc.
In 2.0.2 it will have to be qualified with @StreamRetryTemplate
; this was a fix because any RetryTemplate
overrides the properties, which may not be the desired behavior.
Upvotes: 2