Reputation: 117
I am quite new to spring integration. I am evaluating the Spring Integration for our project. I am run into one issue on how to handle the Exception.
I am using the publishSubscribeChannel for handling the message. I am not sure if this is a correct approach or not. When a exception is thrown inside publishSubscribeChannel, I would like it to route to a different channel so I can reply to different HTTP status code.
How do I route the exception inside the publishSubscribeChannel to the errorChannel.
I have the following code. I have tried to use routeException
in a different area of the code but no luck. Can someone please help me with how to solve this?
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
// curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json'
@Bean
MessageChannel directChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow httpGateway() {
return IntegrationFlows.from(
Http.inboundGateway("/tasks")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class)
.requestChannel(directChannel())
.get()
)
.transform(t -> {
return "transofrm " + t;
})
.channel("queueChannel")
.routeByException(r -> {
r.channelMapping(RuntimeException.class, "errorChannel");
r.defaultOutputToParentFlow();
})
.get();
}
@Bean
public IntegrationFlow handleMessage() {
return IntegrationFlows.from("queueChannel")
.wireTap(flow -> flow.handle(System.out::println))
.routeByException(r -> {
r.channelMapping(RuntimeException.class, "errorChannel");
r.defaultOutputToParentFlow();
})
.publishSubscribeChannel(publisher -> {
publisher.errorHandler(var1 -> {
var1.printStackTrace();
})
.subscribe(flow -> flow
.handle(m -> {
if (m.getPayload().toString().contains("user")) {
throw new RuntimeException("user found");
}
System.out.println("subscribed " + m.getPayload());
})
);
}
)
.transform(t -> "")
.wireTap(flow -> flow.handle(m -> {
System.out.println(m.getHeaders().get("status"));
}))
.enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK))
.get();
}
@Bean
IntegrationFlow exceptionOrErrorFlow() {
return IntegrationFlows.from("errorChannel")
.wireTap(f -> f.handle(m -> System.out.println("failed badly")))
.enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST))
.get();
}
}
Upvotes: 0
Views: 760
Reputation: 174729
It is not entirely clear what you are trying to do. You generally should not use a queue channel in a flow like this.
Simply add an .errorChannel
to the inbound gateway and any downstream exceptions will be sent to that channel, to which you can add a subscriber to handle the exceptions.
Also, you should not call get()
on internally declared specs (those that are not beans) like that, use the form of .from()
that takes a ...Spec directly. Otherwise the bean won't be initialized properly.
return IntegrationFlows.from(
Http.inboundGateway("/tasks")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class)
.requestChannel(directChannel())
.errorChannel("myErrors")
)
...
Upvotes: 1