Reputation: 31
I configured a route in Spring Integration 5.4.4 that read from an AMQP queue and write to an http outbound adapter. I'm not able to control retries when, for example, I programmatically declare a wrong http hostname for the http outbound adapter (Cause java.net.UnknownHostException).
This seems to generate an infinite retry (message not acked on RabbitMQ container), even if I configured the RetryTemplate logic in the amqpInboundAdapter.
My goal should be: requeue the message for N times in case http outbound adapter is in error, discard the message otherwise and don't requeue again.
Code here:
Spring Integration route
public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
return IntegrationFlows
.from(amqpInboundChannelAdapterSMLCSpec)
.filter(validJsonFilter())
.enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
.enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl) .mappedRequestHeaders("X-Insert-Key")
.httpMethod(HttpMethod.POST)
)
.get();
}
AmqpInboundChannelAdapterSMLCSpec
public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {
RetryTemplate retryTemplate = new RetryTemplate();
exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
retryTemplate.setThrowLastExceptionOnExhausted(true);
return Amqp
.inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
.configureContainer(c -> c
.concurrentConsumers(3)
.maxConcurrentConsumers(5)
.receiveTimeout(2000)
.alwaysRequeueWithTxManagerRollback(false)
)
.retryTemplate(retryTemplate);
}
Any ideas?
Thanks a lot
Upvotes: 1
Views: 544
Reputation: 121177
requeue the message for N times in case http outbound adapter is in error, discard the message otherwise and don't requeue again.
When you use a retry on the AMQP MessageListenerContainer, there is on requeue: the retry is done in the memory without round trips to the broker.
Anyway what you do so far is OK. Only what you are missing is a RejectAndDontRequeueRecoverer
to be configured for that Amqp.inboundAdapter()
to decide what to do with an AMQP message when all the retry attempts are exhausted.
Unfortunately the direct MessageRecoverer
configuration for channel adapter has been added since version 5.5
: https://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp.
For the current version it has to be done via recoveryCallback(RecoveryCallback<?> recoveryCallback)
option and respective delegation:
.recoveryCallback(context -> {
org.springframework.amqp.core.Message messageToReject =
(org.springframework.amqp.core.Message) RetrySynchronizationManager.getContext()
.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(context.getLastThrowable()), messageToReject);
}))
Upvotes: 1