Reputation: 175
Could somebody help me rewrite this flow using a thread pool ? The below code works, but uses a fixed delay to service incoming files:
@Bean
public IntegrationFlow sampleFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(500)))
.channel(new DirectChannel())
.transform(fileMessageToJobRequest())
.handle(springBatchJobLauncher())
.handle(jobExecution -> {
logger.info("jobExecution payload: {}", jobExecution.getPayload());
})
.get();
}
Threads are needed because files are coming in a quick rate.
Upvotes: 0
Views: 2339
Reputation: 175
Thanks @Artem. I found the solution based on Artem's answer. The trick is using the TaskExecutor in the code below. Also Pollers.maxMessagesPerPoll(nbfiles) should be set to process more than one message (=file) at a time.
@Bean
public IntegrationFlow sampleFlow() throws InterruptedException {
return IntegrationFlows
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(5)))
.channel(MessageChannels.executor(threadPoolTaskExecutor()))
.transform(fileMessageToJobRequest())
.handle(springBatchJobLauncher())
.handle(jobExecution -> {
logger.debug("jobExecution payload: {}", jobExecution.getPayload());
})
.get();
}
@Bean
public TaskExecutor threadPoolTaskExecutor() {
int poolSize = 20;
logger.debug("...... createing ThreadPool of size {}.", poolSize);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("Dama_Thread_");
executor.setMaxPoolSize(5);
executor.setCorePoolSize(5);
executor.setQueueCapacity(22);
return executor;
}
Upvotes: 1
Reputation: 121262
The poller can be configured with this option:
/**
* Specify an {@link Executor} to perform the {@code pollingTask}.
* @param taskExecutor the {@link Executor} to use.
* @return the spec.
*/
public PollerSpec taskExecutor(Executor taskExecutor) {
Where you really can provide a ThreadPoolTaskExecutor
instance.
Upvotes: 0