Reputation: 2251
I have the following configuration for the integration flow
@Bean(name = PollerMetadata.DEFAULT_POLLER)
PollerMetadata poller() {
return Pollers.fixedRate(100)
.maxMessagesPerPoll(1)
.errorHandler(errorHandler)
.get();
}
@Bean
IntegrationFlow f() {
return IntegrationFlows.from(
MessageChannels.queue("f.input", 500))
...
.get();
}
So as I can see this f
flow is async meaning that when controller calls a gateway that publishes a message to the f.input
queue, then it returns immediately no matter what is inside this f
flow I have. This is exactly what I want, however I do not understand:
Sorry for multiple questions, but I suppose all of them can be answered by just one example.
Upvotes: 2
Views: 2612
Reputation: 174484
Unless already defined by the application, the framework creates a "taskScheduler" bean. This has 10 threads by default and is shared across all pollers (and other scheduled tasks in the framework). You can increase the number of threads or completely override the default bean.
Your understanding is correct, the next poll is not scheduled until the current one finishes.
To run multiple requests consecutively, you can add a taskExcecutor
(such as a ThreadPoolTaskExecutor
) to the poller and the poller thread will hand off to one of its threads and the next poll will be scheduled immediately.
You need to take care, though that you don't poll too fast for the pool to keep up; otherwise the task executor's work queue will fill up.
You can solve that problem by limiting the executor's queue size and set the RejectedExecutionHandler
to a ThreadPoolExecutor.CallerRunsPolicy
which will cause the poller thread itself to run that task.
Alternatively, you can use an ExecutorChannel
configured with the TE and avoid the poller altogether. The controller thread will simply hand off to the TE, but you'll still need a CallerRunsPolicy
.
Upvotes: 2