Deepboy
Deepboy

Reputation: 531

How do I route errors from executor channel to error channel?

Please refer to the attached diagram for the setup. Explanation below.

enter image description here

There is a common input channel that receives requests. From this input channel, there are two flows:

  1. Flow 1 - stores the request into DB

  2. Flow 2 - Sends the request for Business Processing/forwarding to other external systems

I wanted flow 1 and flow 2 to be independent of each other. So I put Flow 1 on an executor channel. This way, an error in flow 1 will not disrupt flow 2.

Explanation of Flow 1:

  1. From the common input channel, a code reads the request and puts it on a executor channel.
  2. From the executor channel, the class DBStore reads the request and stores it into the DB.
  3. I also have an error channel (common to all classes in the project) which will quietly log the error

What I have:

Inside the code in the green color box, I have defined a ExpressionEvaluatingRequestHandlerAdvice so that any error on the executor channel is sent to the error channel. I assumed that the ExpressionEvaluatingRequestHandlerAdvice will be automatically applied to the executor channel.

Instead, if there is an error, it is getting reposted to the 'Common input channel' and processed repeatedly till the queue fills up.

What I need:

I want any error on the executor channel to be sent to the error channel where it will be quietly logged and message will get disposed.

CODE THAT READS FROM COMMON INPUT CHANNEL AND PUTS ON EXECUTOR CHANNEL:

    @Configuration
@EnableIntegration
public class InputChanneltoExecutorChannelConfig {

//DEFINING THE EXECUTOR CHANNEL
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }

    @Bean(name="executorChannelToDB")
    public ExecutorChannel outboundRequests() {
        return new ExecutorChannel(taskExecutor());
    }
//DEFINE FAILURE CHANNEL FOR USE IN ExpressionEvaluatingRequestHandlerAdvice
    @Bean(name = "DBFailureChannel")
    public static MessageChannel getFailureChannel() {
        return new DirectChannel();
    }   

//MAIN METHOD THAT READS FROM INPUT CHANNEL AND SENDS TO EXECUTOR CHANNEL
    @Bean
    public IntegrationFlow outboundtoDB() {
        return IntegrationFlows
                .from("commonInputChannel")
                /*
                 * We publish the msg to be stored into the DB onto a executor
                 * channel (so that the DB operation is processed on a separate
                 * thread).
                 */
                .channel("executorChannelToDB").get();
                /****************************************************************************
                        *********************************************************
                 * How do I route the error from executor channel to error channel over here?
                        **********************************************************
                 ****************************************************************************/
    }

    /*
     * Create an advice bean to handle DB errors. In case of failure, send
     * response to a separate channel.
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setFailureChannelName("DBFailureChannel");
        advice.setOnFailureExpressionString("'##Error while storing request into DB'");
        advice.setTrapException(true);
        return advice;
    }

    /*
     * We create a separate flow for DB failure because in future we may need
     * other actions such as retries/notify support in addition to logging.
     */
    @Bean
    public IntegrationFlow failure() {
        return IntegrationFlows.from("DBFailureChannel")
                .channel("errorChannel").get();

    }   
}

UPDATE: As per Gary's suggestion, updated the ERROR_CHANNEL and REPLY_CHANNEL.

   @Bean
    public IntegrationFlow outboundtoDB() {
        return IntegrationFlows
                .from("commonInputChannel")
                //Setting Headers
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .enrichHeaders(h -> h.header(MessageHeaders.REPLY_CHANNEL, "DBSuccessChannel", true))
                .channel("executorChannelToDB").get();

DBSuccess channel is set to handle response like this:

@Bean
public IntegrationFlow success() {
    return IntegrationFlows
            .from("DBSuccessChannel")
            .wireTap(
                    flow -> flow.handle(msg -> logger
                            .info("Response from storing in DB : "
                                    + msg.getPayload()))).get();
}

But I still get the error,

2018-09-26 23:34:47.398 ERROR 17186 --- [SimpleAsyncTaskExecutor-465] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: nested exception is java.time.format.DateTimeParseException: Text 'sample creation timestamp' could not be parsed at index 0, failedMessage=GenericMessage [payload=com.td.sba.iep.schema.InstructionRs@37919153, headers={errorChannel=errorChannel, jms_destination=commonInputChannel, Solace_JMS_Prop_IS_Reply_Message=false, priority=0, jms_timestamp=1538018141672, JMS_Solace_isXML=true, replyChannel=DBSuccessChannel, jms_redelivered=true, JMS_Solace_DeliverToOne=false, JMS_Solace_ElidingEligible=false, JMS_Solace_DeadMsgQueueEligible=false, id=ff6c2ea6-b6d6-c67a-7943-6b7db33bb977, jms_messageId=ID:49.37.4.163d608166190664e70:0, timestamp=1538019287394}]

Here, the jms_destination is still set as the input channel and the errors keeps getting reposted to the commonInputChannel. Can you please help?

Upvotes: 2

Views: 884

Answers (1)

Gary Russell
Gary Russell

Reputation: 174664

The advice won't help since it only applies to that endpoint - not the downstream flow and, in any case, even if it did, the handoff to the executor will be successful and any downstream exceptions are handled by the executor (which gets wrapped in an ErrorHandlingTaskExecutor with a MessagePublishingErrorHandler).

Try replacing that component with a header enricher, and set the errorChannel header. Or you can wrap the TE yourself with an MPEH configured with your error channel (the executor channel will detect that the TE is already an EHTE).

EDIT

This works fine for me...

@SpringBootApplication
public class So52526134Application {

    public static void main(String[] args) {
        SpringApplication.run(So52526134Application.class, args);
    }

    @Bean
    public IntegrationFlow mainFlow() {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "myErrors.input"))
                .channel(MessageChannels.executor(executor()))
                .handle((p, h) -> {
                    throw new RuntimeException("foo");
                })
                .get();
    }

    @Bean
    public IntegrationFlow myErrors() {
        return f -> f.handle((p, h) -> {
            System.out.println("in my error flow");
            return p;
        })
        .handle(System.out::println);
    }

    @Bean
    public TaskExecutor executor() {
        return new ThreadPoolTaskExecutor();
    }

}

and

in my error flow
ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: ...

Upvotes: 1

Related Questions