Reputation: 1649
I am building an application using Spring Integration which is used to send files from one FTP server (source) to another FTP server (target). I first send files from source to the local directory using the inbound adapter and then send files from the local directory to the target using the outbound adapter.
My code seems to be working fine and I am able to achieve my goal but my problem is when the connection is reset to the target FTP server during the transfer of files, then the transfer of files don't continue after the connection starts working.
I used the Java configurations using inbound and outbound adapters. Can anyone please tell me if it is possible to resume my transfer of files somehow after the connection reset?
P.S: I am a beginner at Spring, so correct me if I have done something wrong here. Thanks
AppConfig.java:
@Configuration
@Component
public class FileTransferServiceConfig {
@Autowired
private ConfigurationService configurationService;
public static final String FILE_POLLING_DURATION = "5000";
@Bean
public SessionFactory<FTPFile> sourceFtpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(configurationService.getSourceHostName());
sf.setPort(Integer.parseInt(configurationService.getSourcePort()));
sf.setUsername(configurationService.getSourceUsername());
sf.setPassword(configurationService.getSourcePassword());
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public SessionFactory<FTPFile> targetFtpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(configurationService.getTargetHostName());
sf.setPort(Integer.parseInt(configurationService.getTargetPort()));
sf.setUsername(configurationService.getTargetUsername());
sf.setPassword(configurationService.getTargetPassword());
return new CachingSessionFactory<FTPFile>(sf);
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(Message message);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sourceFtpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(configurationService.getSourceDirectory());
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter(
configurationService.getFileMask()));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel",
poller = @Poller(fixedDelay = FILE_POLLING_DURATION ))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File(configurationService.getLocalDirectory()));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler targetHandler() {
FtpMessageHandler handler = new FtpMessageHandler(targetFtpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(
configurationService.getTargetDirectory()));
return handler;
}
}
Application.java:
@SpringBootApplication
public class Application {
public static ConfigurableApplicationContext context;
public static void main(String[] args) {
context = new SpringApplicationBuilder(Application.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler sourceHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
System.out.println("Payload: " + payload);
if (payload instanceof File) {
File file = (File) payload;
System.out.println("Trying to send " + file.getName() + " to target");
}
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(message);
}
};
}
}
Upvotes: 1
Views: 560
Reputation: 121292
First of all it isn't clear what is that sourceHandler
for, but you really should be sure that it is subscribed (or targetHandler
) to proper channel.
I somehow believe that in your target code the targetHandler
is really subscribed to the toFtpChannel
.
Anyway that isn't related.
I think the problem here is exactly with the AcceptOnceFileListFilter
and error. So, filter work first during directory scan and load all the local files to the in-memory queue for performance reason. Then all of them are sent to the channel for processing. When we reach the targetHandler
and got an exception, we just silently got away to the global errorChannel
loosing the fact that file hasn't been transferred. And this happens with all the remaining files in memory. I think the transfer is resumed anyway but it is going work already only for new files in the remote directory.
I suggest you to add ExpressionEvaluatingRequestHandlerAdvice
to the targetHandler
definition (@ServiceActivator(adviceChain)
) and in case of error call the AcceptOnceFileListFilter.remove(File)
:
/**
* Remove the specified file from the filter so it will pass on the next attempt.
* @param f the element to remove.
* @return true if the file was removed as a result of this call.
*/
boolean remove(F f);
This way you remove the failed files from the filter and it will be picked up on the next poll task. You have to make AcceptOnceFileListFilter
to be able to get an access to it from the onFailureExpression
. The file is the payload
of request message.
EDIT
The sample for the ExpressionEvaluatingRequestHandlerAdvice
:
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnFailureExpressionString("@acceptOnceFileListFilter.remove(payload)");
advice.setTrapException(true);
return advice;
}
...
@ServiceActivator(inputChannel = "ftpChannel", adviceChain = "expressionAdvice")
Everything rest you can get from their JavaDocs.
Upvotes: 1