Reputation: 91
The JmsTests.java was very helpful in understanding ways to structure my code. Code is at https://github.com/spring-projects/spring-integration-java-dsl/blob/master/src/test/java/org/springframework/integration/dsl/test/jms/JmsTests.java
But, for error handling, I am kind of figuring it out as we go. Are there any tests or reference code that show good ways to structure error channels or using sub-flows to delegate error handling?
Adding more context to the question:
jmsMessageDrivenFlow() works fine and prints the message. I am expecting the error handler to be called only if there is an error during the jmsMessageDrivenFlow. However, the errorhandler() also gets called, and it puts a 'Dispatcher failed to deliver Message' message in the fatal queue.
I am hoping that the error channel shouldn't even be invoked in this scenario since jmsMessageDrivenFlow() didn't create any errors. Obviously, if I leave out .errorChannel(errorChannel()) in the jmsMessageDrivenFlow(), the error flow is not invoked and I get only jmsMessageDrivenFlow() executing as expected.
@EnableIntegration
@IntegrationComponentScan
@Component
@Configuration
public class MessageReceiver {
private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
String fatalErrorQueue = "fatal";
String incomingQueue = "incoming";
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Bean
public Queue incomingQueue() {
return new ActiveMQQueue(incomingQueue);
}
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow handleErrors() {
return IntegrationFlows
.from("errorChannel")
.handle((payload,headers) -> {
System.out.println("processing error: "+payload.toString());
return payload;
})
.handle(Jms.outboundAdapter(jmsMessagingTemplate.getConnectionFactory()).destination(fatalErrorQueue))
.get();
}
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(
Jms.messageDriverChannelAdapter(jmsMessagingTemplate.getConnectionFactory())
.destination(incomingQueue)
.errorChannel(errorChannel())
)
.handle((payload,headers) ->{
System.out.println("processing payload: "+payload.toString());
return payload;
})
.get();
}
}
Log of the execution:
2016-01-22 10:18:20,531 DEBUG DefaultMessageListenerContainer-1 ServiceActivatingHandler.handleMessage - ServiceActivator for [org.springframework.integration.dsl.LambdaMessageProcessor@522d5d91] (org.springframework.integration.handler.ServiceActivatingHandler#1) received message: GenericMessage [payload=SAMPLE QUEUE MESSAGE, headers={jms_redelivered=false, jms_correlationId=, jms_type=, id=78d5456e-4442-0c2b-c545-870d2c177802, priority=0, jms_timestamp=1453479493323, jms_messageId=ID:crsvcdevlnx01.chec.local-47440-1453310266960-6:2:1:1:1, timestamp=1453479500531}]
processing payload: SAMPLE QUEUE MESSAGE
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 ServiceActivatingHandler.handleMessage - ServiceActivator for [org.springframework.integration.dsl.LambdaMessageProcessor@5545b00b] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
processing error: org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - preSend on channel 'handleErrors.channel#0', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,535 DEBUG DefaultMessageListenerContainer-1 JmsSendingMessageHandler.handleMessage - org.springframework.integration.jms.JmsSendingMessageHandler#0 received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,537 DEBUG DefaultMessageListenerContainer-1 DynamicJmsTemplate.execute - Executing callback on JMS Session: ActiveMQSession {id=ID:MACD13-60edd8d-49463-1453479492653-1:1:1,started=true} java.lang.Object@858db12
2016-01-22 10:18:20,568 DEBUG DefaultMessageListenerContainer-1 DynamicJmsTemplate.doSend - Sending created message: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@559da78d, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1453479500535}, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2016-01-22 10:18:20,570 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - postSend (sent=true) on channel 'handleErrors.channel#0', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=f434348c-f659-1e88-2eba-3d467761ab47, timestamp=1453479500535}]
2016-01-22 10:18:20,571 DEBUG DefaultMessageListenerContainer-1 DirectChannel.send - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, headers={id=dc87f71d-a38c-4718-a2e9-2fba2a7c847c, timestamp=1453479500535}]
Upvotes: 1
Views: 3112
Reputation: 121560
OK. Thanks. Investigating... But from the big height everything works as expected:
jmsMessageDrivenFlow()
is one-way flowYou have non-end .handle()
in the end of that flow:
.handle((payload,headers) ->{
System.out.println("processing payload: "+payload.toString());
return payload;
})
Since you return anything, the flow expect the output-channel
or replyChannel
in headers. The ErrorMessage
says you about that.
I tell that about the approach to fix the error altogether. For your case you can just change .handle((payload,headers) ->
to the .handle(message ->
which is one-way to stop your flow exactly there.
Since you really have that issue for the next endpoint after that replying .handle()
, the messagingGateway
reacts for that correctly:
this.messagingTemplate.convertAndSend(requestChannel, object, this.historyWritingPostProcessor);
}
catch (Exception e) {
MessageChannel errorChannel = getErrorChannel();
if (errorChannel != null) {
this.messagingTemplate.send(errorChannel, new ErrorMessage(e));
}
And that's why your handleErrors()
is able to handle that error.
Without errorChannel()
on the Jms.messageDriverChannelAdapter()
we end up in the AbstractMessageListenerContainer
:
protected void executeListener(Session session, Message message) {
try {
doExecuteListener(session, message);
}
catch (Throwable ex) {
handleListenerException(ex);
}
}
where its default ErrorHandler
is ... null
and we see only:
logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex);
Isn't it?
UPDATE
Can you please explain why messagingGateway sends the payload to errorChannel() just because I have defined an error channel for non-happy-path scnenarios.
??? You will end up with the same error even with the XML definition. The Java DSL uses the same components on the background. I won't explain the regular Java code: you see the try...catch
around convertAndSend
and your condition at runtime defines that there is something wrong. That's why you get that DestinationResolutionException
. we can't identify non-happy-path during init phase, because your return
in the .handle()
can be lucky null
only at runtime. So, from the configuration perspective everything is OK. From other side even without outputChannel
you still can be happy there with the request/reply
scenarion, when we have replyChannel
header on the matter.
Upvotes: 1