Reputation: 3169
I set up a control bus with Spring Integration DSL:
// https://docs.spring.io/spring-integration/reference/html/control-bus.html
// https://stackoverflow.com/a/45269746/5873923
@Configuration
public class ControlBus {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(controlChannel()).controlBus().get();
}
@Bean
public MessageChannel controlChannel() {
return MessageChannels.direct().get();
}
}
With this, I'm able to start/stop the inbound from some integration flow, with:
controlChannel.send(new GenericMessage<>("@myInbound.start()"));
controlChannel.send(new GenericMessage<>("@myInbound.stop()"));
The .send
method returns true or false depending on the message being sent or not.
How can I check the status of the bean?
controlChannel.send(new GenericMessage<>("@myInbound.isRunning()"));
will also return true or false,
new MessagingTemplate().send(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));
will just send the message and return nothing
new MessagingTemplate().sendAndReceive(controlChannel, new GenericMessage<>("@myInbound.isRunning()"));
sends the message and hangs waiting for a response.
How can I properly configure the output for the control bus and return it?
Upvotes: 0
Views: 42
Reputation: 121427
Works as expected:
@SpringJUnitConfig
public class So74741707Tests {
@Autowired
@Qualifier("controlBusFlow.input")
MessageChannel controlBusFlowInput;
@Test
void receiveReplyFromControlBus() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
assertThat(
messagingTemplate.sendAndReceive(this.controlBusFlowInput,
new GenericMessage<>("@myEndpoint.isRunning()")))
.extracting(Message::getPayload)
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
.isFalse();
messagingTemplate.convertAndSend(this.controlBusFlowInput, "@myEndpoint.start()");
assertThat(
messagingTemplate.sendAndReceive(this.controlBusFlowInput,
new GenericMessage<>("@myEndpoint.isRunning()")))
.extracting(Message::getPayload)
.asInstanceOf(InstanceOfAssertFactories.BOOLEAN)
.isTrue();
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
@Bean
public IntegrationFlow controlBusFlow() {
return BaseIntegrationFlowDefinition::controlBus;
}
@ServiceActivator(inputChannel = "inputChannel", autoStartup = "false")
@EndpointId("myEndpoint")
public void myService(Object payload) {
}
}
}
Probably you more than just that Control Bus subscribers on your controlChannel
.
The start()
and stop()
are really one-way operations. Therefore indeed just send()
is enough. The isRunning()
returns a boolean
and to handle it we have to perform a request-reply operation or send the reply message to the output channel of that Control Bus endpoint.
Upvotes: 1