Reputation: 359
I'm using Spring Boot along with Spring integration version 4.2.5.RELEASE
.
I'm trying to setup a retry advice correctly for one of my Spring Integration flows that invokes a remote http service.
The simplified draft of the flow looks like this:
1. Gateway --> 2. flow that sets required http headers --> 3. common outbound gateway template --> 4. http-outbound-gateway.
What I want to achieve:
If the http operation is hanging or the remote service is not available, try to send the request again at most 5 times and then put the Message into a recovery channel where I can perform some additional actions, but also I would like to return a valid response to the gateway that is the entry point of my flow (FetchNewIdsResponseDTO
).
Relevant code for the individual flow elements:
1. Gateway:
@MessagingGateway(errorChannel = GATEWAY_ERROR_CHANNEL_NAME,
defaultReplyTimeout = "2000")
public interface FetchNewIdsGateway {
@Gateway(requestChannel = GATEWAY_INPUT_CHANNEL_NAME)
FetchNewIdsResponseDTO fetch(Object o);
}
2. flow that sets required http headers:
@Bean
IntegrationFlow fetchNewIdsOutboundGatewayFlow(@Value("${worker-app.hostname}") String workerAppHostname,
CommonOutboundGatewayTemplate.CommonOutboundGateway gateway) {
return IntegrationFlows.from(IntegrationFlowConstants.OUTBOUND_GATEWAY_TEMPLATE_INPUT_CHANNEL)
.enrichHeaders(h -> h
.header(CommonMessageHeaders.WORKER_APP_REQUEST_HTTP_METHOD, HttpMethod.GET.toString())
.header(CommonMessageHeaders.WORKER_APP_LOCATION, workerAppHostname)
.header(CommonMessageHeaders.WORKER_APP_REQUEST_PATH, ServiceIntegrationControllerEndpoints.FETCH_NEW_IDS)
.header(CommonMessageHeaders.FLOW_NAME, IntegrationFlowTypes.FETCH_NEW_IDS.getFlowName()))
.handle(Object.class, gateway::send)
.get();
}
3. common outbound gateway template:
@MessagingGateway(name = COMMON_OUTBOUND_GATEWAY_NAME,
defaultReplyChannel = GATEWAY_OUTPUT_CHANNEL,
errorChannel = GATEWAY_ERROR_CHANNEL,
defaultReplyTimeout = "2000")
public interface CommonOutboundGateway {
@Gateway(requestChannel = COMMON_OUTBOUND_GATEWAY_INPUT_CHANNEL)
Object send(Object o, @Headers Map<String, Object> headers);
}
4. http-outbound-gateway:
@Bean
IntegrationFlow commonOutboundGatewayFlow(@Qualifier(REST_TEMPLATE_NAME) RestTemplate restTemplate,
@Qualifier(COMMON_OUTBOUND_GATEWAY_RECOVERY_CHANNEL) MessageChannel recoveryChannel) {
return IntegrationFlows.from(COMMON_OUTBOUND_GATEWAY_INPUT_CHANNEL)
.enrichHeaders(h -> h.header(CommonMessageHeaders.CONTENT_TYPE, CommonMessageHeaders.CONTENT_TYPE_JSON_VALUE))
.handle(Http.outboundGateway("{url}{path}", restTemplate)
.httpMethodFunction(m -> m.getHeaders().get(CommonMessageHeaders.WORKER_APP_REQUEST_HTTP_METHOD))
.charset("UTF-8")
.encodeUri(false)
.mappedRequestHeaders(CommonMessageHeaders.CONTENT_TYPE)
.mappedResponseHeaders(CommonMessageHeaders.CONTENT_TYPE)
.expectedResponseType(String.class)
.uriVariable("url", m -> m.getHeaders().get(CommonMessageHeaders.WORKER_APP_LOCATION))
.uriVariable("path", m -> m.getHeaders().get(CommonMessageHeaders.WORKER_APP_REQUEST_PATH))
, e -> e.advice(retryAdvice(recoveryChannel)))
.channel(COMMON_OUTBOUND_GATEWAY_OUTPUT_TRANSFORMER_CHANNEL)
.get();
}
Retry advice:
@Bean
public RequestHandlerRetryAdvice retryAdvice(@Qualifier(COMMON_OUTBOUND_GATEWAY_RECOVERY_CHANNEL) MessageChannel recoveryChannel) {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel));
advice.setRetryTemplate(retryTemplate());
return advice;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(5.0);
backOffPolicy.setMaxInterval(1000L);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
And finally, the flow that could perform additional advice actions and transforming the Message (error handling)
@Bean
public IntegrationFlow fetchNewIdsErrorRecovery() {
return IntegrationFlows.from(CommonOutboundGatewayTemplate.COMMON_OUTBOUND_GATEWAY_RECOVERY_CHANNEL)
// -->ADDITIONAL RECOVERY ACTIONS WOULD GO HERE, BEFORE TRANSFORMATION!
.<MessagingException, Message>transform(p -> {
Object failedPayload = p.getFailedMessage().getPayload();
return MessageBuilder.createMessage(failedPayload, p.getFailedMessage().getHeaders());
})
What I can see from the logs
2017-01-11 21:56:32,318 2505 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
2017-01-11 21:56:32,831 3018 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
2017-01-11 21:56:32,831 3018 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
2017-01-11 21:56:33,833 4020 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
2017-01-11 21:56:33,833 4020 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
2017-01-11 21:56:34,835 5022 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
2017-01-11 21:56:34,835 5022 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=3
2017-01-11 21:56:35,836 6023 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=4
2017-01-11 21:56:35,836 6023 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=4
2017-01-11 21:56:35,838 6025 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=5
2017-01-11 21:56:35,838 6025 [main] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=5
2017-01-11 21:56:50,900 21087 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,trace-log-channel, id=f27f1633-9e5f-a3eb-f1b7-6bc04cecf167, timestamp=1484168210900}]
2017-01-11 21:56:50,902 21089 [main] ERROR FetchNewIdsFlowLoggers$$EnhancerBySpringCGLIB$$ccbdfda2 - [STEP 1::fetch new IDs] Error while executing the flow, DETAILS: HEADERS: {history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,com.szyszy.superscraper.integration.flows.fetchnewids.FetchNewIdsFlowLoggers.fetchNewIdsErrorLogHandler.serviceActivator.handler,fetchNewIdsErrorLogHandler, id=fe6fed47-bcca-ac61-3f09-6f573e2b0cde, timestamp=1484168210901}PAYLOAD: org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds
2017-01-11 21:56:50,903 21090 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,trace-log-channel, id=651ded47-22c4-d2a6-7326-c08960ff9247, timestamp=1484168210903}]
2017-01-11 21:56:52,484 22671 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,common-outbound-gateway-error-log-channel,trace-log-channel, id=da0fff16-9fef-69e9-ed9d-bfcde7e01e29, timestamp=1484168212484}]
2017-01-11 21:56:52,486 22673 [main] ERROR CommonOutboundGatewayTemplateLoggers$$EnhancerBySpringCGLIB$$2fe9ed5c - [OUTBOUND GATEWAY OPERATION] Failed to send message to the worker application, DETAILS: HEADERS: {replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,common-outbound-gateway-error-log-channel,com.szyszy.superscraper.integration.commonflows.outbound.CommonOutboundGatewayTemplateLoggers.commonOutboundGatewayErrorLogHandler.serviceActivator.handler,commonOutboundGatewayErrorLogHandler, id=08ca5452-68d4-4464-6fa7-961632330405, timestamp=1484168212485}PAYLOAD: org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available -- Failed payload: : org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds
2017-01-11 21:56:52,489 22676 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,trace-log-channel, id=590c49a7-3d4b-ced2-b2bc-3250508e1986, timestamp=1484168212489}]
2017-01-11 21:56:52,490 22677 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4993febc, history=common-outbound-gateway-error-transformer-channel,com.szyszy.superscraper.integration.commonflows.outbound.CommonOutboundGatewayTemplate.gatewayErrorChannel.bridgeTo.handler,common-outbound-gateway-error-response-transformer-channel,trace-log-channel, id=e12b2aa5-b471-bed0-c627-7620cb9c8bcf, timestamp=1484168212490}]
Especially this is interesting:
payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
Since my error channel is reached, the response object is created correctly (FetchNewIdsResponseDTO
) and goes back to the first gateway. After the Message reached the recovery channel, a DestinationResolutionException
was thrown. Actually, I don't understand the reason why this is happening.
My fetchNewIdsErrorRecovery
flow builds a new Message
, copying the original headers, so the reply-channel should be available in my understanding.
How could I perform any additional actions (through the recovery channel) while get rid of this exception at the same time?
Could someone please enlighten me about the advice's recoveryCallback
purpose?
Is it created for error handling or just differently, do some additional actions and rely on the http gateway's error handling in the end?
UPDATE:
Adding the .bridge(null)
call before .get()
solved the issue, this is log output after the issue is resolved:
2017-01-16 22:11:42,376 2260 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
2017-01-16 22:11:42,890 2774 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
2017-01-16 22:11:42,890 2774 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
2017-01-16 22:11:43,891 3775 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
2017-01-16 22:11:43,891 3775 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
2017-01-16 22:11:44,891 4775 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
2017-01-16 22:11:44,891 4775 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=3
2017-01-16 22:11:45,893 5777 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=4
2017-01-16 22:11:45,893 5777 [main] DEBUG o.s.retry.support.RetryTemplate - Retry: count=4
2017-01-16 22:11:45,893 5777 [main] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=5
2017-01-16 22:11:45,894 5778 [main] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=5
2017-01-16 22:11:45,895 5779 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,trace-log-channel, id=5872d71d-29fa-a71f-0c9f-800d4decc6cb, timestamp=1484601105895}]
2017-01-16 22:11:45,897 5781 [main] ERROR FetchNewIdsFlowLoggers$$EnhancerBySpringCGLIB$$f05c6801 - [STEP 1::fetch new IDs] Error while executing the flow, DETAILS: HEADERS: {history=common-outbound-gateway-recovery-channel,fetch-new-ids-error-log-channel,com.szyszy.superscraper.integration.flows.fetchnewids.FetchNewIdsFlowLoggers.fetchNewIdsErrorLogHandler.serviceActivator.handler,fetchNewIdsErrorLogHandler, id=e176b462-d02e-0603-b6ec-9a79588794e8, timestamp=1484601105896}PAYLOAD: org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds
2017-01-16 22:11:45,898 5782 [main] INFO o.s.i.handler.LoggingHandler - ErrorMessage [payload=org.springframework.messaging.MessagingException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds; nested exception is org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$ThrowableHolderException: java.lang.AssertionError: No further requests expected: HTTP GET http://testWorkerAppHostname/newIds, headers={history=common-outbound-gateway-recovery-channel,trace-log-channel, id=7cc7379b-0320-661e-657c-d1d5a0c6a741, timestamp=1484601105898}]
2017-01-16 22:11:45,900 5784 [main] INFO o.s.i.handler.LoggingHandler - GenericMessage [payload=FetchNewIdsResponseDTO{links=null, ids=null, RestResponseDTO{errorMessage='Error while fetching new ids - Remote server is not available!', success=false}}, headers={workerAppLocation=http://testWorkerAppHostname, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d0402b, workerAppRequestPath=/newIds, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d0402b, workerAppRequestHTTPMethod=GET, history=fetchNewIdsFlow$FetchNewIdsGateway,fetch-new-ids-gateway-in-channel,in-child-context-individual-outbound-gateway-template-channel,fetchNewIdsOutboundGatewayFlow.channel#0,commonOutboundToWorkerGateway,common-outbound-gateway-input-channel,commonOutboundGatewayFlow.channel#0,fetchNewIdsErrorResponseTransformer2.channel#0,trace-log-channel, id=ee760170-a73f-ddcf-8a1a-c0835f773374, flowName=FETCH_NEW_IDS, Content-Type=application/json, timestamp=1484601105900
Upvotes: 2
Views: 3736
Reputation: 121177
Oh! I found what the problem.
You have to place .bridge(null)
just after .<MessagingException, Message>transform()
, before get()
.
The problem with a replyChannel
header that Framework consults a requestMessage
for that purpose. not reply
object.
Not sure that we are going to change something in the Framework on the matter, but that is a fact, that reply
doesn't not make affect to the output
resolution decision.
We can do that via Routing Slip pattern, buy you have to place a header on the matter in advance anyway.
The JIRA ticket on the matter.
Upvotes: 1