Ben Li
Ben Li

Reputation: 117

Spring integration handle exception in publisherSubscribeChannel

I am quite new to spring integration. I am evaluating the Spring Integration for our project. I am run into one issue on how to handle the Exception.

I am using the publishSubscribeChannel for handling the message. I am not sure if this is a correct approach or not. When a exception is thrown inside publishSubscribeChannel, I would like it to route to a different channel so I can reply to different HTTP status code.

How do I route the exception inside the publishSubscribeChannel to the errorChannel.

I have the following code. I have tried to use routeException in a different area of the code but no luck. Can someone please help me with how to solve this?

@Configuration
@EnableIntegration
public class IntegrationConfiguration {

    // curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json'
    @Bean
    MessageChannel directChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow httpGateway() {
        return IntegrationFlows.from(
            Http.inboundGateway("/tasks")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class)
                .requestChannel(directChannel())
            .get()
        )
            .transform(t -> {
                return "transofrm " + t;
            })
            .channel("queueChannel")
            .routeByException(r -> {
                r.channelMapping(RuntimeException.class, "errorChannel");
                r.defaultOutputToParentFlow();
            })
            .get();
    }

    @Bean
    public IntegrationFlow handleMessage() {
        return IntegrationFlows.from("queueChannel")
            .wireTap(flow -> flow.handle(System.out::println))
            .routeByException(r -> {
                r.channelMapping(RuntimeException.class, "errorChannel");
                r.defaultOutputToParentFlow();
            })
            .publishSubscribeChannel(publisher -> {
                publisher.errorHandler(var1 -> {
                    var1.printStackTrace();
                })
                    .subscribe(flow -> flow
                        .handle(m -> {
                            if (m.getPayload().toString().contains("user")) {
                                throw new RuntimeException("user found");
                            }
                            System.out.println("subscribed " + m.getPayload());
                        })
                    );
                }
            )

            .transform(t -> "")
            .wireTap(flow -> flow.handle(m -> {
                System.out.println(m.getHeaders().get("status"));
            }))
            .enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK))
            .get();
    }

    @Bean
    IntegrationFlow exceptionOrErrorFlow() {
        return IntegrationFlows.from("errorChannel")
            .wireTap(f -> f.handle(m -> System.out.println("failed badly")))
            .enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST))
            .get();
    }
}

Upvotes: 0

Views: 760

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

It is not entirely clear what you are trying to do. You generally should not use a queue channel in a flow like this.

Simply add an .errorChannel to the inbound gateway and any downstream exceptions will be sent to that channel, to which you can add a subscriber to handle the exceptions.

Also, you should not call get() on internally declared specs (those that are not beans) like that, use the form of .from() that takes a ...Spec directly. Otherwise the bean won't be initialized properly.


   return IntegrationFlows.from(
        Http.inboundGateway("/tasks")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class)
            .requestChannel(directChannel())
            .errorChannel("myErrors")
    )
    ...

Upvotes: 1

Related Questions