StartingDev
StartingDev

Reputation: 91

Are there any code samples for spring integration dsl error handling?

The JmsTests.java was very helpful in understanding ways to structure my code. Code is at https://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.java

But, for error handling, I am kind of figuring it out as we go. Are there any tests or reference code that show good ways to structure error channels or using sub-flows to delegate error handling?

Adding more context to the question:

  1. jmsMessageDrivenFlow: reads from incoming queue and prints payload. If there is an error, then it is sent to errorChannel()
  2. handleErrors(): Is supposed to listen to errorChannel and send anything there to the fatalErrorQueue

jmsMessageDrivenFlow() works fine and prints the message. I am expecting the error handler to be called only if there is an error during the jmsMessageDrivenFlow. However, the errorhandler() also gets called, and it puts a 'Dispatcher failed to deliver Message' message in the fatal queue.

I am hoping that the error channel shouldn't even be invoked in this scenario since jmsMessageDrivenFlow() didn't create any errors. Obviously, if I leave out .errorChannel(errorChannel()) in the jmsMessageDrivenFlow(), the error flow is not invoked and I get only jmsMessageDrivenFlow() executing as expected.

@EnableIntegration
@IntegrationComponentScan
@Component
@Configuration
public class MessageReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
    String fatalErrorQueue = "fatal";
    String incomingQueue = "incoming";

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Bean
    public Queue incomingQueue() {
        return new ActiveMQQueue(incomingQueue);
    }

    @Bean
    public MessageChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow handleErrors() {
        return IntegrationFlows
                .from("errorChannel")
                .handle((payload,headers) -> {
                    System.out.println("processing error: "+payload.toString());
                    return payload;
                })
                .handle(Jms.outboundAdapter(jmsMessagingTemplate.getConnectionFactory()).destination(fatalErrorQueue))
                .get();
    }

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow() {
        return IntegrationFlows
                .from(
                        Jms.messageDriverChannelAdapter(jmsMessagingTemplate.getConnectionFactory())
                                .destination(incomingQueue)
                                .errorChannel(errorChannel())
                )
                .handle((payload,headers) ->{
                    System.out.println("processing payload: "+payload.toString());
                    return payload;
                })
                .get();
    }
}

Log of the execution:

2016-01-22 10:18:20,531 DEBUG DefaultMessageListenerContainer-1 ServiceActivatingHandler.handleMessage - ServiceActivator for [org.springframework.integration.dsl.LambdaMessageProcessor@522d5d91] (org.springframework.integration.handler.ServiceActivatingHandler#1) received message: GenericMessage [payload=SAMPLE QUEUE MESSAGE, headers={jms_redelivered=false, jms_correlationId=, jms_type=, id=78d5456e-4442-0c2b-c545-870d2c177802, priority=0, jms_timestamp=1453479493323, jms_messageId=ID:crsvcdevlnx01.chec.local-47440-1453310266960-6:2:1:1:1, timestamp=1453479500531}]
processing payload: SAMPLE QUEUE MESSAGE
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 ServiceActivatingHandler.handleMessage - ServiceActivator for [org.springframework.integration.dsl.LambdaMessageProcessor@5545b00b] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
processing error: org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - preSend on channel 'handleErrors.channel#0', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 JmsSendingMessageHandler.handleMessage - org.springframework.integration.jms.JmsSendingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,537 DEBUG DefaultMessageListenerContainer-1 DynamicJmsTemplate.execute - Executing callback on JMS Session: ActiveMQSession {id=ID:MACD13-60edd8d-49463-1453479492653-1:1:1,started=true} java.lang.Object@858db12
2016-01-22 10:18:20,568 DEBUG DefaultMessageListenerContainer-1 DynamicJmsTemplate.doSend - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@559da78d, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1453479500535}, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2016-01-22 10:18:20,570 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - postSend (sent=true) on channel 'handleErrors.channel#0', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,571 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]

Upvotes: 1

Views: 3112

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

OK. Thanks. Investigating... But from the big height everything works as expected:

  1. Your jmsMessageDrivenFlow() is one-way flow
  2. You have non-end .handle() in the end of that flow:

    .handle((payload,headers) ->{
                    System.out.println("processing payload: "+payload.toString());
                    return payload;
                })
    

    Since you return anything, the flow expect the output-channel or replyChannel in headers. The ErrorMessage says you about that.

  3. I tell that about the approach to fix the error altogether. For your case you can just change .handle((payload,headers) -> to the .handle(message -> which is one-way to stop your flow exactly there.

Since you really have that issue for the next endpoint after that replying .handle(), the messagingGateway reacts for that correctly:

        this.messagingTemplate.convertAndSend(requestChannel, object, this.historyWritingPostProcessor);
    }
    catch (Exception e) {
        MessageChannel errorChannel = getErrorChannel();
        if (errorChannel != null) {
            this.messagingTemplate.send(errorChannel, new ErrorMessage(e));
        }

And that's why your handleErrors() is able to handle that error.

Without errorChannel() on the Jms.messageDriverChannelAdapter() we end up in the AbstractMessageListenerContainer:

protected void executeListener(Session session, Message message) {
    try {
        doExecuteListener(session, message);
    }
    catch (Throwable ex) {
        handleListenerException(ex);
    }
}

where its default ErrorHandler is ... null and we see only:

logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex);

Isn't it?

UPDATE

Can you please explain why messagingGateway sends the payload to errorChannel() just because I have defined an error channel for non-happy-path scnenarios.

??? You will end up with the same error even with the XML definition. The Java DSL uses the same components on the background. I won't explain the regular Java code: you see the try...catch around convertAndSend and your condition at runtime defines that there is something wrong. That's why you get that DestinationResolutionException. we can't identify non-happy-path during init phase, because your return in the .handle() can be lucky null only at runtime. So, from the configuration perspective everything is OK. From other side even without outputChannel you still can be happy there with the request/reply scenarion, when we have replyChannel header on the matter.

Upvotes: 1

Related Questions