Reputation: 531
Please refer to the attached diagram for the setup. Explanation below.
There is a common input channel that receives requests. From this input channel, there are two flows:
Flow 1 - stores the request into DB
Flow 2 - Sends the request for Business Processing/forwarding to other external systems
I wanted flow 1 and flow 2 to be independent of each other. So I put Flow 1 on an executor channel. This way, an error in flow 1 will not disrupt flow 2.
Explanation of Flow 1:
What I have:
Inside the code in the green color box, I have defined a ExpressionEvaluatingRequestHandlerAdvice so that any error on the executor channel is sent to the error channel. I assumed that the ExpressionEvaluatingRequestHandlerAdvice will be automatically applied to the executor channel.
Instead, if there is an error, it is getting reposted to the 'Common input channel' and processed repeatedly till the queue fills up.
What I need:
I want any error on the executor channel to be sent to the error channel where it will be quietly logged and message will get disposed.
CODE THAT READS FROM COMMON INPUT CHANNEL AND PUTS ON EXECUTOR CHANNEL:
@Configuration
@EnableIntegration
public class InputChanneltoExecutorChannelConfig {
//DEFINING THE EXECUTOR CHANNEL
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
@Bean(name="executorChannelToDB")
public ExecutorChannel outboundRequests() {
return new ExecutorChannel(taskExecutor());
}
//DEFINE FAILURE CHANNEL FOR USE IN ExpressionEvaluatingRequestHandlerAdvice
@Bean(name = "DBFailureChannel")
public static MessageChannel getFailureChannel() {
return new DirectChannel();
}
//MAIN METHOD THAT READS FROM INPUT CHANNEL AND SENDS TO EXECUTOR CHANNEL
@Bean
public IntegrationFlow outboundtoDB() {
return IntegrationFlows
.from("commonInputChannel")
/*
* We publish the msg to be stored into the DB onto a executor
* channel (so that the DB operation is processed on a separate
* thread).
*/
.channel("executorChannelToDB").get();
/****************************************************************************
*********************************************************
* How do I route the error from executor channel to error channel over here?
**********************************************************
****************************************************************************/
}
/*
* Create an advice bean to handle DB errors. In case of failure, send
* response to a separate channel.
*/
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setFailureChannelName("DBFailureChannel");
advice.setOnFailureExpressionString("'##Error while storing request into DB'");
advice.setTrapException(true);
return advice;
}
/*
* We create a separate flow for DB failure because in future we may need
* other actions such as retries/notify support in addition to logging.
*/
@Bean
public IntegrationFlow failure() {
return IntegrationFlows.from("DBFailureChannel")
.channel("errorChannel").get();
}
}
UPDATE: As per Gary's suggestion, updated the ERROR_CHANNEL and REPLY_CHANNEL.
@Bean
public IntegrationFlow outboundtoDB() {
return IntegrationFlows
.from("commonInputChannel")
//Setting Headers
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.enrichHeaders(h -> h.header(MessageHeaders.REPLY_CHANNEL, "DBSuccessChannel", true))
.channel("executorChannelToDB").get();
DBSuccess channel is set to handle response like this:
@Bean
public IntegrationFlow success() {
return IntegrationFlows
.from("DBSuccessChannel")
.wireTap(
flow -> flow.handle(msg -> logger
.info("Response from storing in DB : "
+ msg.getPayload()))).get();
}
But I still get the error,
2018-09-26 23:34:47.398 ERROR 17186 --- [SimpleAsyncTaskExecutor-465] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: nested exception is java.time.format.DateTimeParseException: Text 'sample creation timestamp' could not be parsed at index 0, failedMessage=GenericMessage [payload=com.td.sba.iep.schema.InstructionRs@37919153, headers={errorChannel=errorChannel, jms_destination=commonInputChannel, Solace_JMS_Prop_IS_Reply_Message=false, priority=0, jms_timestamp=1538018141672, JMS_Solace_isXML=true, replyChannel=DBSuccessChannel, jms_redelivered=true, JMS_Solace_DeliverToOne=false, JMS_Solace_ElidingEligible=false, JMS_Solace_DeadMsgQueueEligible=false, id=ff6c2ea6-b6d6-c67a-7943-6b7db33bb977, jms_messageId=ID:49.37.4.163d608166190664e70:0, timestamp=1538019287394}]
Here, the jms_destination is still set as the input channel and the errors keeps getting reposted to the commonInputChannel. Can you please help?
Upvotes: 2
Views: 884
Reputation: 174664
The advice won't help since it only applies to that endpoint - not the downstream flow and, in any case, even if it did, the handoff to the executor will be successful and any downstream exceptions are handled by the executor (which gets wrapped in an ErrorHandlingTaskExecutor
with a MessagePublishingErrorHandler
).
Try replacing that component with a header enricher, and set the errorChannel
header. Or you can wrap the TE yourself with an MPEH configured with your error channel (the executor channel will detect that the TE is already an EHTE).
EDIT
This works fine for me...
@SpringBootApplication
public class So52526134Application {
public static void main(String[] args) {
SpringApplication.run(So52526134Application.class, args);
}
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "myErrors.input"))
.channel(MessageChannels.executor(executor()))
.handle((p, h) -> {
throw new RuntimeException("foo");
})
.get();
}
@Bean
public IntegrationFlow myErrors() {
return f -> f.handle((p, h) -> {
System.out.println("in my error flow");
return p;
})
.handle(System.out::println);
}
@Bean
public TaskExecutor executor() {
return new ThreadPoolTaskExecutor();
}
}
and
in my error flow
ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: ...
Upvotes: 1