Reputation: 173
I would like to do the following with spring integration
Here is what I have so far.
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setMaxFetchSize(2);
return source;
}
Here is my service activator. The problem with my service activator is this run in the same thread as the poller, so when the file process too long, it doesn't process the next until the first one is done.
@ServiceActivator(inputChannel = "sftpChannel")
public void sftpChannel(@Payload File payload, @Header("timestamp") long timestamp) {
log.info("Message arrived at sftpChannel");
//do something with file
}
How can I run the file process on separate thread and release the poller thread instead, so the poller can continue to pull files from sftp?
Upvotes: 1
Views: 5156
Reputation: 121247
Something like this:
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(5);
return executor;
}
And use that executor
as a bean name in the @Poller
of the @InboundChannelAdapter
:
@Bean
@InboundChannelAdapter(channel = "sftpChannel",
poller = @Poller(fixedDelay = "100000", maxMessagesPerPoll = "3", taskExecutor="executor"))
See @Poller
JavaDocs:
/**
* @return The {@link org.springframework.core.task.TaskExecutor} bean name.
*/
String taskExecutor() default "";
And also see docs in the Reference Manual: https://docs.spring.io/spring-integration/docs/5.0.9.RELEASE/reference/html/messaging-channels-section.html#conditional-pollers
Important: Async Handoff
This advice modifies the trigger based on the
receive()
result. This will only work if the advice is called on the poller thread. It will not work if the poller has atask-executor
. To use this advice where you wish to use async operations after the result of a poll, do the async handoff later, perhaps by using anExecutorChannel
.
Upvotes: 2
Reputation: 5097
You can use the @Async
annotation to run any method in a separate thread. You just need to add @EnableAsync
in any @Configuration
file and when you call it, it will run asynchronously. You can find more information in this blog.
Upvotes: 0