user311633
user311633

Reputation: 173

How to run spring integration with multithread

I would like to do the following with spring integration

  1. Fetch files from sftp
  2. Send the downloaded files to http and also to s3

Here is what I have so far.

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setMaxFetchSize(2);
        return source;
    }

Here is my service activator. The problem with my service activator is this run in the same thread as the poller, so when the file process too long, it doesn't process the next until the first one is done.

@ServiceActivator(inputChannel = "sftpChannel")
        public void sftpChannel(@Payload File payload, @Header("timestamp") long timestamp) {
            log.info("Message arrived at sftpChannel");
         //do something with file
    }

How can I run the file process on separate thread and release the poller thread instead, so the poller can continue to pull files from sftp?

Upvotes: 1

Views: 5156

Answers (2)

Artem Bilan
Artem Bilan

Reputation: 121247

Something like this:

@Bean
public ThreadPoolTaskExecutor executor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(5);
    return executor; 
}

And use that executor as a bean name in the @Poller of the @InboundChannelAdapter:

@Bean
@InboundChannelAdapter(channel = "sftpChannel", 
        poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3", taskExecutor="executor"))

See @Poller JavaDocs:

/**
 * @return The {@link org.springframework.core.task.TaskExecutor} bean name.
 */
String taskExecutor() default "";

And also see docs in the Reference Manual: https://docs.spring.io/spring-integration/docs/5.0.9.RELEASE/reference/html/messaging-channels-section.html#conditional-pollers

Important: Async Handoff

This advice modifies the trigger based on the receive() result. This will only work if the advice is called on the poller thread. It will not work if the poller has a task-executor. To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using an ExecutorChannel.

Upvotes: 2

Alain Cruz
Alain Cruz

Reputation: 5097

You can use the @Async annotation to run any method in a separate thread. You just need to add @EnableAsync in any @Configuration file and when you call it, it will run asynchronously. You can find more information in this blog.

Upvotes: 0

Related Questions