Nick Tsitlakidis
Nick Tsitlakidis

Reputation: 2479

How to postpone consuming of messages in spring integration

I'm working on an application using Spring Integration 5.0.1 and Spring Boot 2.0.0.RC1

Currently, the application responds to the ApplicationReadyEvent and runs some initialization code that might take a while to finish. This uses none of the spring integration components.

I also have some pretty basic integration flows, written using java dsl and declared as beans in a configuration.

Is there any way to postpone when the flows will start consuming messages? I want to be able to start them manually when the initialization is finished.

It seems that configuring a ControlBus would be the solution, but I have no idea how to connect something like that with the other flows.

The following is a sample of how the flow consumes messages :

IntegrationFlows.from(sourceGateway)
                .transform(Transformers.fromJson(IncomingTask.class, jsonObjectMapper))

                .handle(IncomingTask.class, (incomingTask, headers) -> {

                //stuff with the task here.

                })

                .get();

Upvotes: 1

Views: 334

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

Right, you definitely can use ControlBus on the matter. With Java DSL it looks like:

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

To use it you need this:

@Autowired
@Qualifier("controlBus.input")
private MessageChannel controlBusChannel;

Now we need to know how your target IntegraionFlow starts. What consumes messages. For example I have this:

@Bean
public IntegrationFlow fileFlow1() {
    return IntegrationFlows.from("fileFlow1Input")
            .handle(Files.outboundAdapter(tmpDir.getRoot()),
                        c -> c.id("fileWriting").autoStartup(false))
                .get();
    }

Pay attention to the c.id("fileWriting").autoStartup(false). The id is for the endpoint bean which can be accessed via command sent to the Control Bus. The autoStartup(false) means that it isn't going to consume messages immediately, but only when we call start(). And I do it like this:

this.controlBusChannel.send(new GenericMessage<>("@fileWriting.start()"));

Something similar you should ensure in your configuration to postpone messages consumption to the time you need it.

Upvotes: 1

Related Questions