Reputation: 579
I‘ve been pulling my hairs on how to implement the retry pattern (retry the WHOLE flow) for a Spring-Integration poller flow. Please find below my (erroneous) source-code (doesn't work).
What am I doing wrong ?
(if I put a breakpoint on the line throwing an exception, it's only hit once)
thanks a lot in advance for your time and your expertise.
Best Regards
nkjp
PS: maybe try to extend AbstractHandleMessageAdvice with a RetryTemplate ?
return IntegrationFLows.from(SOME_QUEUE_CHANNEL)
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(5000)
.advice(RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1,2,10).build())))
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
})
.channel(new NullChannel())
.get();
Upvotes: 2
Views: 1054
Reputation: 121442
If you add poller.advice()
, then an Advice
is applied to the whole flow starting with poll()
method. Since you have already polled a message from that queue, there is nothing to poll from it on the next attempt. It is kinda anti-pattern to use retry for non-transactional queues: you don't rollback transactions so your data doesn't come back to store to be available for the next poll()
.
There is no way at the moment to retry a whole sub-flow from some point, but you definitely can use a RequestHandlerRetryAdvice
on the specific erroneous endpoint like that your transform()
with KABOOM
exception:
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
}, e -> e.advice(new RequestHandlerRetryAdvice()))
See its setRetryTemplate(RetryTemplate retryTemplate)
for more retry options instead of just 3 attempts by default.
To make for a sub-flow, we need to consider to implement a HandleMessageAdvice
.
Something like this:
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000))
.advice(new HandleMessageAdvice() {
RetryOperationsInterceptor delegate =
RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1, 2, 10)
.build();
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return delegate.invoke(invocation);
}
}))
But again: it's not a poller
advice., it is an endpoint one on its MessageHandler.handleMessage()
.
Upvotes: 2