Amar Dev
Amar Dev

Reputation: 1479

Spring integration router behavior from 4.3.10.RELEASE to 5.1.5.RELEASE

I have an application based on Spring Integration and a part of that application has the following config.

It worked fine for the version 4.3.10.RELEASE. But now I am trying to upgrade the version to 5.1.5.RELEASE and the application fails to start with the exception mention below.

I was able to resolve the issue with changing the config but I am wondering why this has been changed

Old config working in 4.3.10.RELEASE:

@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
                                StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
                                AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
    return flow -> flow.channel(inboundChannel)
            .channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
            .publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
                    flow1.handle(statusUpdater, "handle"))
                    .subscribe(flow2 -> flow2.transform(myTransformer)
                            .filter(myFilter, "filter")
                            .route(messageRouter)
                            .handle(inlineHandler, "handle")
                    ).errorHandler(new MessagePublishingErrorHandler())
            );
}

Exception in the logs with above config for version 5.1.5.RELEASE:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. at org.springframework.integration.dsl.IntegrationFlowDefinition.registerOutputChannelIfCan(IntegrationFlowDefinition.java:3088) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.register(IntegrationFlowDefinition.java:3033) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1185) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1001) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:979) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null$1(IntegrationConfig.java:42) ~[classes/:na] at org.springframework.integration.dsl.PublishSubscribeSpec.subscribe(PublishSubscribeSpec.java:58) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null$2(IntegrationConfig.java:38) ~[classes/:na] at org.springframework.integration.dsl.IntegrationFlowDefinition.publishSubscribeChannel(IntegrationFlowDefinition.java:255) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$mainFlow$3(IntegrationConfig.java:36) ~[classes/:na] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:309) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:117) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:414) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]

Updated config working for 5.1.5.RELEASE:

The processing happening in inlineHandler bean is now happening in a channel/flow of its own instead of being inline. I had to change the messageRouter bean route the message to default flow which was happening inline earlier.

@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
                                StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
                                AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
    return flow -> flow.channel(inboundChannel)
            .channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
            .publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
                    flow1.handle(statusUpdater, "handle"))
                    .subscribe(flow2 -> flow2.transform(myTransformer)
                            .filter(myFilter, "filter")
                            .route(messageRouter)
                            //.handle(inlineHandler, "handle") // removed this
                    ).errorHandler(new MessagePublishingErrorHandler())
            );
}


@Bean
public AbstractMessageRouter messageRouter(@Qualifier("swift") MessageChannel messageChannel, MessageChannel defaultFlow) {
    return new AbstractMessageRouter() {
        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            System.out.println(">>>>>> Determining channel");
            if(//some cond) {
                 return Collections.singleton(defaultFlow);
              }
            return Collections.singleton(messageChannel);
        }
    };
}

Upvotes: 0

Views: 147

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

I am wondering why this has been changed

Well, let's see that error message again:

The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

Now let's take a look into the router and get its difference, e.g. from service activator:

https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#router-implementations

https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#service-activator

As you see there is a mapping for the router to determine the further channel to send the message to. In your old config:

 .route(messageRouter)
 .handle(inlineHandler, "handle")

You just had that unconditional mapping for the default channel providing this chain of methods. However in that old version we missed the fact that falling back to the default output is not a default (see resolutionRequired = true option). So, we can't make an assumption that end-users always want a fallback to the main flow back. More over it produces an impression that you always can configure router in the middle of the flow. Starting with version 5.0 we aligned IntegrationFlowDefinition with router nature and have introduced an option to make assumption explicit:

/**
 * Make a default output mapping of the router to the parent flow.
 * Use the next, after router, parent flow {@link MessageChannel} as a
 * {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
 * @return the router spec.
 */
public S defaultOutputToParentFlow() {

And there is a doc on the matter: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-routers

Upvotes: 2

Related Questions