gbalcisoy
gbalcisoy

Reputation: 127

Spring Integration DSL Custom Error Channel Not Work

I use DSL implementation of Spring Integration. I have code below and I can not use my custom error flow. When authenticate method throws Runtime Exception, the errorChannel starts to process. I enrich header to use my custom error flow, but not use.

// In Class - 1
 @Bean
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {

        MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
        wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
        wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
        wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorChannel"));
        wsInboundGateway.setMarshaller(marshaller);
        wsInboundGateway.setUnmarshaller(marshaller);
        return wsInboundGateway;
    }


// In Class - 2
@Bean
    public IntegrationFlow incomingRequest() {
        return f -> f.<Object, Class<?>>route(t -> t.getClass(),
                mapping -> mapping.subFlowMapping(payloadType1(),
                        sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
                        .subFlowMapping(payloadType2(),
                                sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
                        conf -> conf.id("router:Incoming request router"));
    }

// In Class - 3
    @Bean
    public IntegrationFlow type1() {
        IntegrationFlow integrationFlow = f -> f
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "error222", true))
                .<Type1>handle((p, h) -> authentication.authenticate(p),
                        conf -> conf.id("service-activator:Authenticate"))
                .transform(transformer::transformType1MsgToDataX,
                        conf -> conf.id("transform:Unmarshall type1 Message"))
                .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
                        .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
                .handle((GenericHandler<DataX>) repository::successResponseMessage,
                        conf -> conf.id("service-activator:return success"))
                .channel("outgoingResponse.input")
                ;

        return integrationFlow;
    }

// In Class - 3
@Bean
    public IntegrationFlow error222Flow() {

        return IntegrationFlows.from("error222").handle("repository", "failureResponseMessage").get()

                ;

    }

EDIT:

After Artem's answers, my code like below. But still, I can't access header parameter in the error flow. I get error - "No channel resolved by router 'router:error response prepare' "

 // In Class - 1
 @Bean
    public MarshallingWebServiceInboundGateway marshallingWebServiceInboundGateway(BeanFactoryChannelResolver channelResolver, Jaxb2Marshaller marshaller) {

        MarshallingWebServiceInboundGateway wsInboundGateway = new MarshallingWebServiceInboundGateway();
        wsInboundGateway.setRequestChannel(channelResolver.resolveDestination("incomingRequest.input"));
        wsInboundGateway.setReplyChannel(channelResolver.resolveDestination("outgoingResponse.input"));
        wsInboundGateway.setErrorChannel(channelResolver.resolveDestination("errorResponse.input"));
        wsInboundGateway.setMarshaller(marshaller);
        wsInboundGateway.setUnmarshaller(marshaller);
        return wsInboundGateway;
    }


// In Class - 2
@Bean
    public IntegrationFlow incomingRequest() {
        return f -> f.<Object, Class<?>>route(t -> t.getClass(),
                mapping -> mapping.subFlowMapping(payloadType1(),
                        sf -> sf.gateway("type1.input", ConsumerEndpointSpec::transactional))
                        .subFlowMapping(payloadType2(),
                                sf -> sf.gateway("type2.input", ConsumerEndpointSpec::transactional)),
                        conf -> conf.id("router:Incoming request router"));
    }

// In Class - 2
@Bean 
public IntegrationFlow errorResponse(){ 
    return f -> f.<MessageHandlingException, Object>route(t -> t.getFailedMessage().getHeaders().get("ABCDEF"), 
                        mapping -> mapping.subFlowMapping("ABCDEF", 
                                sf -> sf.gateway("customError.input", ConsumerEndpointSpec::transactional)), 
                                conf -> conf.id("router:error response prepare"));
}

// In Class - 3
    @Bean
    public IntegrationFlow type1() {
        IntegrationFlow integrationFlow = f -> f
                .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true)) 
                .<Type1>handle((p, h) -> authentication.authenticate(p),
                        conf -> conf.id("service-activator:Authenticate"))
                .transform(transformer::transformType1MsgToDataX,
                        conf -> conf.id("transform:Unmarshall type1 Message"))
                .enrichHeaders(h -> h.headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_ID, "payload.id")
                        .headerExpression(TypeDataIntegrationMessageHeaderAccessor.MESSAGE_TYPE, "payload.messageType"))
                .handle((GenericHandler<DataX>) repository::successResponseMessage,
                        conf -> conf.id("service-activator:return success"))
                .channel("outgoingResponse.input")
                ;

        return integrationFlow;
    }

// In Class - 3
@Bean
    public IntegrationFlow customError(){
        return f -> f.handle((GenericHandler<MessageHandlingException>)eventRepository::failureResponseMessage,
                                conf -> conf.id("service-activator:return failure"));
    }

EDIT - 2:

I try Artem's test code, it works in this scenario. If I convert the type1 flow to the subflow mapping as below (I do it, because I'm doubtful my subflow code block), the error flow can't print ABCDEF parameter value. After that, I add another header(XYZTWR) to the subflow mapping, but it can't be printed too.

@Bean
public IntegrationFlow type1() {
    return f -> f.<String, String>route(t -> t.toString(), mapping -> mapping.subFlowMapping("foo",
            sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional).enrichHeaders(h -> h.header("XYZTRW", "XYZTRW", true))));
}

@Bean
public IntegrationFlow fooFlow() {
    return f -> f.enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
            .handle((p, h) -> {
                throw new RuntimeException("intentional");
            });
}

My S.OUT is :

GenericMessage [payload=foo, headers={history=testGateway,type1.input, id=1fad7a65-4abe-c41d-0b22-36839a103269, timestamp=1503029553071}]

Upvotes: 1

Views: 7764

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121550

The errorChannel header starts work when we shift message to the different thread executor or queue channel. Otherwise standard throw and try...catch works in the same call stack.

So in your case the authentication exception is just thrown to the caller - WS Inbound Gateway. And here you have configured global error channel.

I did this testing:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public static class ContextConfiguration {

    @Bean
    public IntegrationFlow errorResponse() {
        return IntegrationFlows.from(errorChannel())
                    .<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
                            e -> e.poller(p -> p.fixedDelay(100)))
                    .get();
    }

    @Bean
    public IntegrationFlow type1() {
            return f -> f
                    .enrichHeaders(h -> h.header("ABCDEF", "ABCDEF", true))
                    .handle((p, h) -> { throw new RuntimeException("intentional"); });
    }

    @Bean
    public PollableChannel errorChannel() {
        return new QueueChannel();
    }
}

@MessagingGateway(errorChannel = "errorChannel", defaultRequestChannel = "type1.input")
public interface TestGateway {

    Message<?> sendTest(String payload);

}

...

@Autowired
private TestGateway testGateway;

@Test
public void testErrorChannel() {
    Message<?> message = this.testGateway.sendTest("foo");
    System.out.println(message);
}

And my SOUT shows me:

GenericMessage [payload=foo, headers={ABCDEF=ABCDEF, id=ae5d2d44-46b7-912d-17d4-bf2ee656140a, timestamp=1502999446725}]

Please, make DEBUG logging level for the org.springframework.integration category and observe in which step your message is losing desired headers.

UPDATE

OK. I see your problem. Since you use sf -> sf.gateway("fooFlow.input", ConsumerEndpointSpec::transactional), in other words you call the downstream via gateway, everything you've done there is behind the door and you can get as guilty to back in case of error only what you send there - gateway's request message. The downstream failedMessage is swallowed by default.

To fix the problem you should consider an addition errorChannel() option for that .gateway() and handle the downstream error there. Or... just don't use .gateway() in the router's subflow, but simple channel mapping.

The .transactional() can be configured also on the any .handle().

Upvotes: 2

Related Questions