tosky
tosky

Reputation: 31

Spring Integration - Control retry logic in http outboundAdapter

I configured a route in Spring Integration 5.4.4 that read from an AMQP queue and write to an http outbound adapter. I'm not able to control retries when, for example, I programmatically declare a wrong http hostname for the http outbound adapter (Cause java.net.UnknownHostException).

This seems to generate an infinite retry (message not acked on RabbitMQ container), even if I configured the RetryTemplate logic in the amqpInboundAdapter.

My goal should be: requeue the message for N times in case http outbound adapter is in error, discard the message otherwise and don't requeue again.

Code here:

Spring Integration route

public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
        return IntegrationFlows
                .from(amqpInboundChannelAdapterSMLCSpec)
                .filter(validJsonFilter())
                .enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
                .enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
                .enrichHeaders(h -> h.header("Content-Type", "application/json"))
                .handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl)                         .mappedRequestHeaders("X-Insert-Key")
                        .httpMethod(HttpMethod.POST)
                )
                .get();
    }

AmqpInboundChannelAdapterSMLCSpec

public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {

        RetryTemplate retryTemplate = new RetryTemplate();

        exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
        retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
        retryTemplate.setThrowLastExceptionOnExhausted(true);

        return Amqp
                .inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
                .configureContainer(c -> c
                        .concurrentConsumers(3)
                        .maxConcurrentConsumers(5)
                        .receiveTimeout(2000)
                        .alwaysRequeueWithTxManagerRollback(false)
                )
                .retryTemplate(retryTemplate);
    }

Any ideas?

Thanks a lot

Upvotes: 1

Views: 544

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

requeue the message for N times in case http outbound adapter is in error, discard the message otherwise and don't requeue again.

When you use a retry on the AMQP MessageListenerContainer, there is on requeue: the retry is done in the memory without round trips to the broker.

Anyway what you do so far is OK. Only what you are missing is a RejectAndDontRequeueRecoverer to be configured for that Amqp.inboundAdapter() to decide what to do with an AMQP message when all the retry attempts are exhausted.

Unfortunately the direct MessageRecoverer configuration for channel adapter has been added since version 5.5: https://docs.spring.io/spring-integration/docs/5.5.0-M3/reference/html/whats-new.html#x5.5-amqp.

For the current version it has to be done via recoveryCallback(RecoveryCallback<?> recoveryCallback) option and respective delegation:

   .recoveryCallback(context -> {
        org.springframework.amqp.core.Message messageToReject =
            (org.springframework.amqp.core.Message) RetrySynchronizationManager.getContext()
                    .getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
        throw new ListenerExecutionFailedException("Retry Policy Exhausted",
                new AmqpRejectAndDontRequeueException(context.getLastThrowable()), messageToReject);
    }))

Upvotes: 1

Related Questions