Reputation: 1479
I have an application based on Spring Integration and a part of that application has the following config.
It worked fine for the version 4.3.10.RELEASE. But now I am trying to upgrade the version to 5.1.5.RELEASE and the application fails to start with the exception mention below.
I was able to resolve the issue with changing the config but I am wondering why this has been changed
Old config working in 4.3.10.RELEASE:
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
.handle(inlineHandler, "handle")
).errorHandler(new MessagePublishingErrorHandler())
);
}
Exception in the logs with above config for version 5.1.5.RELEASE:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. at org.springframework.integration.dsl.IntegrationFlowDefinition.registerOutputChannelIfCan(IntegrationFlowDefinition.java:3088) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.register(IntegrationFlowDefinition.java:3033) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1185) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1001) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:979) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null$1(IntegrationConfig.java:42) ~[classes/:na] at org.springframework.integration.dsl.PublishSubscribeSpec.subscribe(PublishSubscribeSpec.java:58) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null$2(IntegrationConfig.java:38) ~[classes/:na] at org.springframework.integration.dsl.IntegrationFlowDefinition.publishSubscribeChannel(IntegrationFlowDefinition.java:255) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$mainFlow$3(IntegrationConfig.java:36) ~[classes/:na] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:309) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:117) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:414) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
Updated config working for 5.1.5.RELEASE:
The processing happening in inlineHandler
bean is now happening in a channel/flow of its own instead of being inline. I had to change the messageRouter
bean route the message to default flow which was happening inline earlier.
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
//.handle(inlineHandler, "handle") // removed this
).errorHandler(new MessagePublishingErrorHandler())
);
}
@Bean
public AbstractMessageRouter messageRouter(@Qualifier("swift") MessageChannel messageChannel, MessageChannel defaultFlow) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
System.out.println(">>>>>> Determining channel");
if(//some cond) {
return Collections.singleton(defaultFlow);
}
return Collections.singleton(messageChannel);
}
};
}
Upvotes: 0
Views: 147
Reputation: 121552
I am wondering why this has been changed
Well, let's see that error message again:
The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.
Now let's take a look into the router and get its difference, e.g. from service activator:
As you see there is a mapping
for the router to determine the further channel to send the message to. In your old config:
.route(messageRouter)
.handle(inlineHandler, "handle")
You just had that unconditional mapping for the default channel
providing this chain of methods. However in that old version we missed the fact that falling back to the default output is not a default (see resolutionRequired = true
option). So, we can't make an assumption that end-users always want a fallback to the main flow back. More over it produces an impression that you always can configure router in the middle of the flow. Starting with version 5.0
we aligned IntegrationFlowDefinition
with router nature and have introduced an option to make assumption explicit:
/**
* Make a default output mapping of the router to the parent flow.
* Use the next, after router, parent flow {@link MessageChannel} as a
* {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
* @return the router spec.
*/
public S defaultOutputToParentFlow() {
And there is a doc on the matter: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl-routers
Upvotes: 2