Roman
Roman

Reputation: 101

Spring Integration: Error by using subFlowMapping: failed to look up MessageChannel with name 'true'

I have got this error in my 'normal' code and was able to reproduce it in one simple junit test. This error is thrown by using .route(...)..subFlowMapping(...) If I comment .route(..) code out then test runs successful.

Could please someone explain why does Spring Integration try to find 'true' channel instead to route to true subflow?

@Test
public void testDynaSubFlowCreation() {
    Flux<Message<?>> messageFlux = Flux.just("1,2,3,4").map(v -> v.split(",")).flatMapIterable(Arrays::asList)
            .map(Integer::parseInt).map(GenericMessage<Integer>::new);

    QueueChannel resultChannel = new QueueChannel();

    IntegrationFlow integrationFlow = IntegrationFlows
            .from(messageFlux)
            .<Integer, Boolean>route(p->p % 2 == 0, m->m
                    .subFlowMapping(true, sf-> sf.<Integer, String>transform(em->{return "even:"+em;}).log())
                    .subFlowMapping(false, sf-> sf.<Integer, String>transform(em->{return "odd:"+em;}).log())
                    .defaultOutputToParentFlow()
                    )
            .log(l -> "!!!!!!!!!!!!!!!!!!!!!!! end int="+l)
            .channel(resultChannel)
            .get();

    this.flowContext.registration(integrationFlow).register();

    int queueSize = resultChannel.getQueueSize();
    assertThat(queueSize).as("queueSize").isEqualTo(4);
}

2018-02-21 17:33:27,541 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
2018-02-21 17:33:28,184 [main] DEBUG o.s.i.handler.LoggingHandler - _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessagingException: failed to resolve channel name 'false'; nested exception is org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name 'false' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'false' available, failedMessage=GenericMessage [payload=1, headers={id=684db1df-ee6e-f35e-8332-168cee2a1681, timestamp=1519230808175}], headers={id=621649c0-b8a0-4573-9220-c38891544043, timestamp=1519230808181}]
2018-02-21 17:33:28,188 [main] ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessagingException: failed to resolve channel name 'false'; nested exception is org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name 'false' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'false' available, failedMessage=GenericMessage [payload=1, headers={id=684db1df-ee6e-f35e-8332-168cee2a1681, timestamp=1519230808175}]
    at org.springframework.integration.router.AbstractMappingMessageRouter.resolveChannelForName(AbstractMappingMessageRouter.java:227)
    at org.springframework.integration.router.AbstractMappingMessageRouter.addChannelFromString(AbstractMappingMessageRouter.java:258)
    at org.springframework.integration.router.AbstractMappingMessageRouter.addToCollection(AbstractMappingMessageRouter.java:296)
    at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:186)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:171)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:141)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:165)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:51)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
    at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:457)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:263)
    at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:571)
    at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:146)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:65)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:438)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:388)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.poll(FluxPeekFuseable.java:310)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:411)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onSubscribe(FluxPublish.java:224)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:206)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxFlattenIterable.subscribe(FluxFlattenIterable.java:107)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86)
    at reactor.core.publisher.FluxPeekFuseable.subscribe(FluxPeekFuseable.java:86)
    at reactor.core.publisher.FluxPublish.connect(FluxPublish.java:99)
    at reactor.core.publisher.ConnectableFlux.connect(ConnectableFlux.java:99)
    at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707)
    at org.springframework.integration.channel.FluxMessageChannel.subscribe(FluxMessageChannel.java:77)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer.doStart(ReactiveStreamsConsumer.java:127)
    at org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:162)
    at org.springframework.integration.config.ConsumerEndpointFactoryBean.start(ConsumerEndpointFactoryBean.java:349)
    at org.springframework.integration.dsl.StandardIntegrationFlow.start(StandardIntegrationFlow.java:102)
    at org.springframework.integration.dsl.context.IntegrationFlowRegistration.start(IntegrationFlowRegistration.java:136)
    at org.springframework.integration.dsl.context.IntegrationFlowContext.register(IntegrationFlowContext.java:108)
    at org.springframework.integration.dsl.context.IntegrationFlowContext.access$500(IntegrationFlowContext.java:66)
    at org.springframework.integration.dsl.context.IntegrationFlowContext$IntegrationFlowRegistrationBuilder.register(IntegrationFlowContext.java:238)
    at com.lr.m3integration.DynaFlowTests.testDynaSubFlowCreation(DynaFlowTests.java:124)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:73)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:83)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
Caused by: org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name 'false' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'false' available
    at org.springframework.integration.support.channel.BeanFactoryChannelResolver.resolveDestination(BeanFactoryChannelResolver.java:117)
    at org.springframework.integration.support.channel.BeanFactoryChannelResolver.resolveDestination(BeanFactoryChannelResolver.java:46)
    at org.springframework.integration.router.AbstractMappingMessageRouter.resolveChannelForName(AbstractMappingMessageRouter.java:223)
    ... 80 more
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'false' available
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanDefinition(DefaultListableBeanFactory.java:687)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getMergedLocalBeanDefinition(AbstractBeanFactory.java:1205)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:292)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:205)
    at org.springframework.integration.support.channel.BeanFactoryChannelResolver.resolveDestination(BeanFactoryChannelResolver.java:89)
    ... 82 more

@artem-bilan:
I'm not completely sure that dynamically registration is a root of a problem. I configured the same flow statically, like this:

@EnableIntegration
@Configuration
public class IntegrationTestConfig {

    @Bean
    public QueueChannel resultChannel() {
        return new QueueChannel();
    }

    @Bean
    public Flux<Message<?>> messageFlux(){
        return Flux.just("1,2,3,4").map(v -> v.split(",")).flatMapIterable(Arrays::asList)
                .map(Integer::parseInt).map(GenericMessage<Integer>::new);
    }

    @Bean
    public MessageChannel inputChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow withSubFlows() {
        return IntegrationFlows
//              .from(inputChannel())
                .from(messageFlux())
                .<Integer, Boolean>route(p->p % 2 == 0, m->m
                        .subFlowMapping(true, sf-> sf.<Integer, String>transform(em->{return "even:"+em;}).log().bridge())
                        .subFlowMapping(false, sf-> sf.<Integer, String>transform(em->{return "odd:"+em;}).log().bridge())
                        .defaultOutputToParentFlow()
                        )
                .log(l -> "!!!!!!!!!!!!!!!!!!!!!!! end int="+l)
                .channel(resultChannel())
                .get();
    }
}

@RunWith(SpringRunner.class)
@SpringIntegrationTest
@SpringBootTest
    @Autowired
    public QueueChannel resultChannel;

    @Autowired
    public MessageChannel inputChannel;

    @Test
    public void testSubFlowCreation() {
        // fill input channel
        IntStream.range(0,4).forEach(i-> inputChannel.send(MessageBuilder.withPayload(i).build()));

        // receive results after flow processing
        assertThat(resultChannel.getReceiveCount()).as("receiveCount").isEqualTo(0);
        assertThat(resultChannel.receive(1000)).as("message").isNotNull();
        assertThat(resultChannel.getQueueSize()).as("queueSize").isEqualTo(3);
    }
}

By executing with input=messageFlux() you will get the same error. By executing with input=inputChannel() junit test runs green.

Upvotes: 1

Views: 1291

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121550

Confirmed. This is a bug in the RouterSpec. We can't use subFlowMapping() for dynamically registered flows for now. Just because the logic there is based on the ContextRefreshedEvent, which happens only once in the application context startup: https://jira.spring.io/browse/INT-4411

Upvotes: 1

Related Questions