Sergei Ledvanov
Sergei Ledvanov

Reputation: 2251

Spring integration multiple threads for QueueChannel

I have the following configuration for the integration flow

@Bean(name = PollerMetadata.DEFAULT_POLLER)
PollerMetadata poller() {
    return Pollers.fixedRate(100)
            .maxMessagesPerPoll(1)
            .errorHandler(errorHandler)
            .get();
}

@Bean
IntegrationFlow f() {
    return IntegrationFlows.from(
            MessageChannels.queue("f.input", 500))
            ...
            .get();
}

So as I can see this f flow is async meaning that when controller calls a gateway that publishes a message to the f.input queue, then it returns immediately no matter what is inside this f flow I have. This is exactly what I want, however I do not understand:

  1. What executor handles this async flow?
  2. How many threads are created in the background to back this flow up?
  3. Is my assumption correct that unless one item in the queue finishes its execution, all other items in the queue keep waiting? (so capacity is 1 or this queue).
  4. How can I make this flow to execute 2 or 3 queue items simultaneously?

Sorry for multiple questions, but I suppose all of them can be answered by just one example.

Upvotes: 2

Views: 2612

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

Unless already defined by the application, the framework creates a "taskScheduler" bean. This has 10 threads by default and is shared across all pollers (and other scheduled tasks in the framework). You can increase the number of threads or completely override the default bean.

Your understanding is correct, the next poll is not scheduled until the current one finishes.

To run multiple requests consecutively, you can add a taskExcecutor (such as a ThreadPoolTaskExecutor) to the poller and the poller thread will hand off to one of its threads and the next poll will be scheduled immediately.

You need to take care, though that you don't poll too fast for the pool to keep up; otherwise the task executor's work queue will fill up.

You can solve that problem by limiting the executor's queue size and set the RejectedExecutionHandler to a ThreadPoolExecutor.CallerRunsPolicy which will cause the poller thread itself to run that task.

Alternatively, you can use an ExecutorChannel configured with the TE and avoid the poller altogether. The controller thread will simply hand off to the TE, but you'll still need a CallerRunsPolicy.

Upvotes: 2

Related Questions