Reputation: 1
In my project I have used DirectChannel bean ,its working perfectly .But the new requirement is doing Async sftp upload parallelly using ExecutorChannel .. But files are not getting transfered .
@Bean
public IntegrationFlow sftpOutboundFlow(
DelegatingSessionFactory lDelegatingSessionFactory)
{
getRemotePath(lDelegatingSessionFactory);
return IntegrationFlow.from("outboundSftpChannel")
.enrichHeaders(lHeader -> lHeader.header("remoteDirectory",
lRemoteDirectoryWithHostName))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel")) //Here i have added errorChannel
.handle(Sftp.outboundAdapter(lDelegatingSessionFactory,
FileExistsMode.REPLACE)
.remoteDirectoryExpression
("headers['remoteDirectory']")).get();
}
@Bean
public MessageChannel outboundSftpChannel()
{
// return new DirectChannel(); // it was working fine ,below is the new implementation
ExecutorChannel executorChannel = new ExecutorChannel(taskExecutor());
// executorChannel.addInterceptor(new ThreadLoggingInterceptor());
return executorChannel;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // Minimum number of threads in the pool
executor.setMaxPoolSize(10); // Maximum number of threads in the pool
executor.setQueueCapacity(25); // Queue capacity for pending tasks
executor.setThreadNamePrefix("AsyncExecutor-"); // Prefix for thread names
executor.setWaitForTasksToCompleteOnShutdown(true); // Ensures tasks complete on shutdown
executor.setAwaitTerminationSeconds(60); // Timeout for waiting for tasks to complete
executor.initialize(); // Initializes the thread pool
return executor;
}
I am not getting any exception as well , but in destination folder there is no new file getting transfer .. I am looking for possible reason & how to fix it ?
Upvotes: 0
Views: 22
Reputation: 121552
The DelegatingSessionFactory
is based on the ThreadLocal
. So, if you set the key into that factory before you send to the outboundSftpChannel
, then it is not a surprise that on the consumer side that DelegatingSessionFactory
does not work properly. Just because the thread is different and its ThreadLocal
does not have a required key.
It is also not clear what you do in the getRemotePath(lDelegatingSessionFactory);
method.
Upvotes: 0