Ajitesh Mandal
Ajitesh Mandal

Reputation: 1

Implementing ExecutorChannel instead of DirectChannel for Sftp upload Operation using spring boot integration

In my project I have used DirectChannel bean ,its working perfectly .But the new requirement is doing Async sftp upload parallelly using ExecutorChannel .. But files are not getting transfered .

 @Bean
    public IntegrationFlow sftpOutboundFlow(
            DelegatingSessionFactory lDelegatingSessionFactory)
    {
        getRemotePath(lDelegatingSessionFactory);
        return IntegrationFlow.from("outboundSftpChannel")
                .enrichHeaders(lHeader -> lHeader.header("remoteDirectory",
                        lRemoteDirectoryWithHostName))
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel"))   //Here i have added errorChannel 
                .handle(Sftp.outboundAdapter(lDelegatingSessionFactory,
                        FileExistsMode.REPLACE)
                        .remoteDirectoryExpression
                                ("headers['remoteDirectory']")).get();
    }
  @Bean
    public MessageChannel outboundSftpChannel()
    {
      //  return new DirectChannel();   // it was working fine ,below is the new implementation 
           ExecutorChannel executorChannel = new ExecutorChannel(taskExecutor());
       // executorChannel.addInterceptor(new ThreadLoggingInterceptor());
        return executorChannel;
    }
@Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);  // Minimum number of threads in the pool
        executor.setMaxPoolSize(10);  // Maximum number of threads in the pool
        executor.setQueueCapacity(25);  // Queue capacity for pending tasks
        executor.setThreadNamePrefix("AsyncExecutor-");  // Prefix for thread names
        executor.setWaitForTasksToCompleteOnShutdown(true);  // Ensures tasks complete on shutdown
        executor.setAwaitTerminationSeconds(60);  // Timeout for waiting for tasks to complete
        executor.initialize();  // Initializes the thread pool
        return executor;
    }

I am not getting any exception as well , but in destination folder there is no new file getting transfer .. I am looking for possible reason & how to fix it ?

Upvotes: 0

Views: 22

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

The DelegatingSessionFactory is based on the ThreadLocal. So, if you set the key into that factory before you send to the outboundSftpChannel, then it is not a surprise that on the consumer side that DelegatingSessionFactory does not work properly. Just because the thread is different and its ThreadLocal does not have a required key.

It is also not clear what you do in the getRemotePath(lDelegatingSessionFactory); method.

Upvotes: 0

Related Questions