Marc Tarin
Marc Tarin

Reputation: 3169

Get the output from a message sent to the control bus

I set up a control bus with Spring Integration DSL:

// https://docs.spring.io/spring-integration/reference/html/control-bus.html
// https://stackoverflow.com/a/45269746/5873923
@Configuration
public class ControlBus {
    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from(controlChannel()).controlBus().get();
    }

    @Bean
    public MessageChannel controlChannel() {
        return MessageChannels.direct().get();
    }
}

With this, I'm able to start/stop the inbound from some integration flow, with:

controlChannel.send(new GenericMessage<>("@myInbound.start()"));
controlChannel.send(new GenericMessage<>("@myInbound.stop()"));

The .send method returns true or false depending on the message being sent or not. How can I check the status of the bean?

controlChannel.send(new GenericMessage<>("@myInbound.isRunning()"));

will also return true or false,

new MessagingTemplate().send(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));

will just send the message and return nothing

new MessagingTemplate().sendAndReceive(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));

sends the message and hangs waiting for a response.

How can I properly configure the output for the control bus and return it?

Upvotes: 0

Views: 42

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121427

Works as expected:

@SpringJUnitConfig
public class So74741707Tests {

    @Autowired
    @Qualifier("controlBusFlow.input")
    MessageChannel controlBusFlowInput;

    @Test
    void receiveReplyFromControlBus() {
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        assertThat(
                messagingTemplate.sendAndReceive(this.controlBusFlowInput,
                        new GenericMessage<>("@myEndpoint.isRunning()")))
                .extracting(Message::getPayload)
                .asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
                .isFalse();

        messagingTemplate.convertAndSend(this.controlBusFlowInput, "@myEndpoint.start()");

        assertThat(
                messagingTemplate.sendAndReceive(this.controlBusFlowInput,
                        new GenericMessage<>("@myEndpoint.isRunning()")))
                .extracting(Message::getPayload)
                .asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
                .isTrue();
    }

    @Configuration
    @EnableIntegration
    public static class TestConfiguration {

        @Bean
        public IntegrationFlow controlBusFlow() {
            return BaseIntegrationFlowDefinition::controlBus;
        }

        @ServiceActivator(inputChannel = "inputChannel", autoStartup = "false")
        @EndpointId("myEndpoint")
        public void myService(Object payload) {

        }

    }

}

Probably you more than just that Control Bus subscribers on your controlChannel.

The start() and stop() are really one-way operations. Therefore indeed just send() is enough. The isRunning() returns a boolean and to handle it we have to perform a request-reply operation or send the reply message to the output channel of that Control Bus endpoint.

Upvotes: 1

Related Questions