Reputation: 173
I have a Spring SFTP output adapter that I start via "adapter.start()" in my main program. Once started, the adapter transfers and uploads all the files in the specified directory as expected. But I want to stop the adapter after all the files have been transferred. How do I detect if all the files have been transferred so I can issue an adapter.stop()?
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
.filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
.preventDuplicates(true),
e -> e.id("sftpOutboundAdapter")
.autoStartup(false)
.poller(Pollers.trigger(new FireOnceTrigger())
.maxMessagesPerPoll(-1)))
.log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getPayload())
.log(LoggingHandler.Level.INFO, "sftp.outbound", m -> m.getHeaders())
.handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
.useTemporaryFileName(false)
.remoteDirectory(sftpRemoteDirectory))
.get();
}
Upvotes: 2
Views: 869
Reputation: 2013
@Artem Bilan has already given the answer. But here's kind of a concrete implementation of what he said - for those who are a Spring Integration noob like me:
@Service
public class MyFileService {
public List<File> getPdfFiles(final String srcDir) {
File[] files = new File(srcDir).listFiles((dir, name) -> name.toLowerCase().endsWith(".pdf"));
return Arrays.asList(files == null ? new File[]{} : files);
}
}
@MessagingGateway
public interface SFtpOutboundGateway {
@Gateway(requestChannel = "sftpOutboundFlow.input")
void uploadFiles(List<File> files);
}
Sftp.outboundGateway
:@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
// could be also bound via @Value
private String sftpRemoteDirectory = "/path/to/remote/dir";
@Bean
public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(22222);
factory.setUser("client1");
factory.setPassword("password123");
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
return e -> e
.log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getPayload)
.log(LoggingHandler.Level.INFO, "sftp.outbound", Message::getHeaders)
.handle(
Sftp.outboundGateway(remoteFileTemplate, AbstractRemoteFileOutboundGateway.Command.MPUT, "payload")
);
}
@Bean
public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
template.setAutoCreateDirectory(true);
template.afterPropertiesSet();
template.setUseTemporaryFileName(false);
return template;
}
}
Wiring up:
public class SpringApp {
public static void main(String[] args) {
final MyFileService fileService = ctx.getBean(MyFileService.class);
final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
// trigger the sftp upload flow manually - only once
sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles());
}
}
Import notes:
1.
@Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);
Here the DirectChannel channel sftpOutboundFlow.input
will be used to pass message with the payload (= List<File> files
) to the receiver. If this channel is not created yet, the Gateway is going to create it implicitly.
2.
@Bean public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) { ... }
Since IntegrationFlow is a Consumer
functional interface, we can simplify the flow a little using the IntegrationFlowDefinition. During the bean registration phase, the IntegrationFlowBeanPostProcessor converts this inline (Lambda) IntegrationFlow to a StandardIntegrationFlow and processes its components. An IntegrationFlow definition using a Lambda populates DirectChannel as an inputChannel of the flow and it is registered in the application context as a bean with the name sftpOutboundFlow.input
in the sample above (flow bean name + ".input"). That's why we use that name for the SFtpOutboundGateway
gateway.
Ref: https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial
3.
@Bean public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {}
see: Remote directory for sftp outbound gateway with DSL
Flowchart:
Upvotes: 1
Reputation: 121550
But I want to stop the adapter after all the files have been transferred.
Logically this is not for what this kind of component has been designed. Since you are not going to have some constantly changing local directory, probably it is better to think about an even driver solution to list files in the directory via some action. Yes, it can be a call from the main, but only once for all the content of the dir and that's all.
And for this reason the Sftp.outboundGateway()
with a Command.MPUT
is there for you:
https://docs.spring.io/spring-integration/reference/html/sftp.html#using-the-mput-command.
You still can trigger an IntegrationFlow
, but it could start from a @MessagingGateway
interface to be called from a main
with a local directory to list files for uploading:
https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-gateway
Upvotes: 1