8xomaster
8xomaster

Reputation: 175

How to use Spring Integration to setup a ThreadPool to process a file message source?

Could somebody help me rewrite this flow using a thread pool ? The below code works, but uses a fixed delay to service incoming files:

@Bean
public IntegrationFlow sampleFlow() {
    return IntegrationFlows
            .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(500)))
            .channel(new DirectChannel())
            .transform(fileMessageToJobRequest())
            .handle(springBatchJobLauncher())
            .handle(jobExecution -> {
                logger.info("jobExecution payload: {}", jobExecution.getPayload());
            })
            .get();
}

Threads are needed because files are coming in a quick rate.

Upvotes: 0

Views: 2339

Answers (2)

8xomaster
8xomaster

Reputation: 175

Thanks @Artem. I found the solution based on Artem's answer. The trick is using the TaskExecutor in the code below. Also Pollers.maxMessagesPerPoll(nbfiles) should be set to process more than one message (=file) at a time.

  @Bean
  public IntegrationFlow sampleFlow() throws InterruptedException {
    return IntegrationFlows
          .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(5)))
          .channel(MessageChannels.executor(threadPoolTaskExecutor()))
          .transform(fileMessageToJobRequest())
          .handle(springBatchJobLauncher())
          .handle(jobExecution -> {
            logger.debug("jobExecution payload: {}", jobExecution.getPayload());
          })
          .get();
  }

  @Bean
  public TaskExecutor threadPoolTaskExecutor() {
    int poolSize = 20;
    logger.debug("...... createing ThreadPool of size {}.", poolSize);
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setThreadNamePrefix("Dama_Thread_");
    executor.setMaxPoolSize(5);
    executor.setCorePoolSize(5);
    executor.setQueueCapacity(22);
    return executor;
  }

Upvotes: 1

Artem Bilan
Artem Bilan

Reputation: 121262

The poller can be configured with this option:

    /**
 * Specify an {@link Executor} to perform the {@code pollingTask}.
 * @param taskExecutor the {@link Executor} to use.
 * @return the spec.
 */
public PollerSpec taskExecutor(Executor taskExecutor) {

Where you really can provide a ThreadPoolTaskExecutor instance.

Upvotes: 0

Related Questions