rcaschultz
rcaschultz

Reputation: 173

Spring SFTP Outbound Adapter - determining when files have been sent

I have a Spring SFTP output adapter that I start via "adapter.start()" in my main program. Once started, the adapter transfers and uploads all the files in the specified directory as expected. But I want to stop the adapter after all the files have been transferred. How do I detect if all the files have been transferred so I can issue an adapter.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),
            e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
            .log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}

Upvotes: 2

Views: 869

Answers (2)

Kenan Güler
Kenan Güler

Reputation: 2013

@Artem Bilan has already given the answer. But here's kind of a concrete implementation of what he said - for those who are a Spring Integration noob like me:

  1. Define a service to get the PDF files on demand:
@Service
public class MyFileService {
    public List<File> getPdfFiles(final String srcDir) {
        File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
        return Arrays.asList(files == null ? new File[]{} : files);
    }
}
  1. Define a Gateway to start the SFTP upload flow on demand:
@MessagingGateway
public interface SFtpOutboundGateway {
    @Gateway(requestChannel = "sftpOutboundFlow.input")
    void uploadFiles(List<File> files);
}
  1. Define the Integration Flow to upload the files to the SFTP server via Sftp.outboundGateway:
@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
    // could be also bound via @Value 
    private String sftpRemoteDirectory = "/path/to/remote/dir";

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22222);
        factory.setUser("client1");
        factory.setPassword("password123");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
        return e -> e
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
                .log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
                .handle(
                    Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
                );
    }

    @Bean
    public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
        RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
        template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
        template.setAutoCreateDirectory(true);
        template.afterPropertiesSet();
        template.setUseTemporaryFileName(false);
        return template;
    }
}

Wiring up:

public class SpringApp {
    public static void main(String[] args) {
        final MyFileService fileService = ctx.getBean(MyFileService.class);
        final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
        // trigger the sftp upload flow manually - only once
        sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
    }
}

Import notes:

1.

@Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);

Here the DirectChannel channel sftpOutboundFlow.input will be used to pass message with the payload (= List<File> files) to the receiver. If this channel is not created yet, the Gateway is going to create it implicitly.

2.

@Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }

Since IntegrationFlow is a Consumer functional interface, we can simplify the flow a little using the IntegrationFlowDefinition. During the bean registration phase, the IntegrationFlowBeanPostProcessor converts this inline (Lambda) IntegrationFlow to a StandardIntegrationFlow and processes its components. An IntegrationFlow definition using a Lambda populates DirectChannel as an inputChannel of the flow and it is registered in the application context as a bean with the name sftpOutboundFlow.input in the sample above (flow bean name + ".input"). That's why we use that name for the SFtpOutboundGateway gateway.

Ref: https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

3.

@Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}

see: Remote directory for sftp outbound gateway with DSL

Flowchart:

enter image description here

Upvotes: 1

Artem Bilan
Artem Bilan

Reputation: 121550

But I want to stop the adapter after all the files have been transferred.

Logically this is not for what this kind of component has been designed. Since you are not going to have some constantly changing local directory, probably it is better to think about an even driver solution to list files in the directory via some action. Yes, it can be a call from the main, but only once for all the content of the dir and that's all.

And for this reason the Sftp.outboundGateway() with a Command.MPUT is there for you:

https://docs.spring.io/spring-integration/reference/html/sftp.html#using-the-mput-command.

You still can trigger an IntegrationFlow, but it could start from a @MessagingGateway interface to be called from a main with a local directory to list files for uploading:

https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-gateway

Upvotes: 1

Related Questions