user2038596
user2038596

Reputation: 579

how to retry an entire poller flow in Spring-Integration

I‘ve been pulling my hairs on how to implement the retry pattern (retry the WHOLE flow) for a Spring-Integration poller flow. Please find below my (erroneous) source-code (doesn't work).

What am I doing wrong ?
(if I put a breakpoint on the line throwing an exception, it's only hit once)

thanks a lot in advance for your time and your expertise.

Best Regards

nkjp

PS: maybe try to extend AbstractHandleMessageAdvice with a RetryTemplate ?

return IntegrationFLows.from(SOME_QUEUE_CHANNEL)  
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(5000)  
    .advice(RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1,2,10).build())))  
.transform(p -> {  
  if (true) {
    throw new RuntimeException("KABOOM");
  }
  return p;
})
.channel(new NullChannel())
.get();

Upvotes: 2

Views: 1054

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121442

If you add poller.advice(), then an Advice is applied to the whole flow starting with poll() method. Since you have already polled a message from that queue, there is nothing to poll from it on the next attempt. It is kinda anti-pattern to use retry for non-transactional queues: you don't rollback transactions so your data doesn't come back to store to be available for the next poll().

There is no way at the moment to retry a whole sub-flow from some point, but you definitely can use a RequestHandlerRetryAdvice on the specific erroneous endpoint like that your transform() with KABOOM exception:

.transform(p -> {
                        if (true) {
                            throw new RuntimeException("KABOOM");
                        }
                        return p;
                    }, e -> e.advice(new RequestHandlerRetryAdvice()))

See its setRetryTemplate(RetryTemplate retryTemplate) for more retry options instead of just 3 attempts by default.

To make for a sub-flow, we need to consider to implement a HandleMessageAdvice. Something like this:

.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000))
                            .advice(new HandleMessageAdvice() {

                                RetryOperationsInterceptor delegate =
                                        RetryInterceptorBuilder.stateless()
                                                .maxAttempts(5)
                                                .backOffOptions(1, 2, 10)
                                                .build();

                                @Override
                                public Object invoke(MethodInvocation invocation) throws Throwable {
                                    return delegate.invoke(invocation);
                                }
                            }))

But again: it's not a poller advice., it is an endpoint one on its MessageHandler.handleMessage().

Upvotes: 2

Related Questions