Reputation: 359
I have two applications that are throwing messages at each other using Spring Cloud stream and rabbitmq binder. I am unable to subscribe to the global or dedicated error channel. I am sure I am missing one of the annotation prayers to one of the Spring framework gods.
I have tried to simplify the code to the following. (Code is also available on https://github.com/achintmehta/springclouddemo)
In the following code, I am trying to send a message to INPUT channel and returning a message on OUTPUT channel. The stream listener for INPUT channel throws an exception and I see logHandler printing the exception but none of my registered endpoints for errors get called.
public static void main(String[] args) {
SpringApplication.run(SprintclouddemoApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
// Send a message to INPUT Channel
System.out.println("****************** Inside run method *********************");
source.input().send(
MessageBuilder
.withPayload("ACTIVE")
.build());
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT) // Send back the response to OUTPUT channel
public String requestReceived(
String state) {
// Receive message on input channel
System.out.println("****************** Received event *********************");
// Throw exception
throw new RuntimeException("!!!!! ABORT ABORT !!!!!");
//return "Event received";
}
@StreamListener(Processor.OUTPUT)
public void responseReceived(Message<?> message) {
// Listen for message on OUTPUT channel
System.out.println("******************* ACK received as : " + message);
}
@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
// THIS FUNCTION NEVER GETS CALLED WHEN EXCEPTION HAPPENS
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Received ERROR: " + message);
return message;
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// THIS FUNCTION NEVER GETS CALLED WHEN EXCEPTION HAPPENS
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Handling ERROR: " + message);
}
Following is my application.yml file
spring:
application:
name: cloudstream-demo
rabbitmq:
host: rabbitmq
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: stateChangeTopic
content-type: application/json
group: stateChangeGroup
output:
destination: stateChangeRspTopic
content-type: application/json
group: stateChangeRspGroup
The output logs are also in the github at following link: https://github.com/achintmehta/springclouddemo/blob/master/logs.txt
Upvotes: 0
Views: 529
Reputation: 6126
There are few things that are wrong with your application
Why are you injecting Processor in your application? I mean the whole purpose for it is to signal to the framework to create necessary bindings to bridge/map local channels with remote destinations (Kafka, Rabbit etc). This means that when you send message to the input
channel directly you are completely bypassing spring-cloud-stream framework and all its features including error handling, message conversion, retries etc. Basically at that point you are simply NOT using spring-cloud-stream. When you throw an exception the exception is propagated back to the original caller. In the case of spring-cloud-stream the original caller is message listening container which catches exception, does retries and then goes through error handling routine including sending to error channel(s). In your case the original caller is you via source.input().send(...)
, not the framework.
The signature here is wrong: @ServiceActivator(inputChannel="errorChannel") public ErrorMessage onError(ErrorMessage message)
. By returning anything other than void
what are your expectations? Where would the ErrorMessage go?
Upvotes: 1