Sibtain
Sibtain

Reputation: 1649

Resume transfer of files after connection reset FTP

I am building an application using Spring Integration which is used to send files from one FTP server (source) to another FTP server (target). I first send files from source to the local directory using the inbound adapter and then send files from the local directory to the target using the outbound adapter.

My code seems to be working fine and I am able to achieve my goal but my problem is when the connection is reset to the target FTP server during the transfer of files, then the transfer of files don't continue after the connection starts working.

I used the Java configurations using inbound and outbound adapters. Can anyone please tell me if it is possible to resume my transfer of files somehow after the connection reset?

P.S: I am a beginner at Spring, so correct me if I have done something wrong here. Thanks

AppConfig.java:

@Configuration
@Component
public class FileTransferServiceConfig {

    @Autowired
    private ConfigurationService configurationService;

    public static final String FILE_POLLING_DURATION = "5000";

    @Bean
    public SessionFactory<FTPFile> sourceFtpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost(configurationService.getSourceHostName());
        sf.setPort(Integer.parseInt(configurationService.getSourcePort()));
        sf.setUsername(configurationService.getSourceUsername());
        sf.setPassword(configurationService.getSourcePassword());
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public SessionFactory<FTPFile> targetFtpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost(configurationService.getTargetHostName());
        sf.setPort(Integer.parseInt(configurationService.getTargetPort()));
        sf.setUsername(configurationService.getTargetUsername());
        sf.setPassword(configurationService.getTargetPassword());
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toFtpChannel")
         void sendToFtp(Message message);

    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sourceFtpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory(configurationService.getSourceDirectory());
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter(
                configurationService.getFileMask()));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel",
            poller = @Poller(fixedDelay = FILE_POLLING_DURATION ))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File(configurationService.getLocalDirectory()));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return source;
    }



    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler targetHandler() {
        FtpMessageHandler handler = new FtpMessageHandler(targetFtpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(
                configurationService.getTargetDirectory()));
        return handler;
    }    
}

Application.java:

@SpringBootApplication
public class Application {

    public static ConfigurableApplicationContext context;

    public static void main(String[] args) {
        context = new SpringApplicationBuilder(Application.class)
                .web(false)
                .run(args);
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler sourceHandler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                Object payload = message.getPayload();
                System.out.println("Payload: " + payload);
                if (payload instanceof File) {
                    File file = (File) payload;
                    System.out.println("Trying to send " + file.getName() + " to target");
                }
                MyGateway gateway = context.getBean(MyGateway.class);
                gateway.sendToFtp(message);
            }

        };
    }
}

Upvotes: 1

Views: 560

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121292

First of all it isn't clear what is that sourceHandler for, but you really should be sure that it is subscribed (or targetHandler) to proper channel.

I somehow believe that in your target code the targetHandler is really subscribed to the toFtpChannel.

Anyway that isn't related.

I think the problem here is exactly with the AcceptOnceFileListFilter and error. So, filter work first during directory scan and load all the local files to the in-memory queue for performance reason. Then all of them are sent to the channel for processing. When we reach the targetHandler and got an exception, we just silently got away to the global errorChannel loosing the fact that file hasn't been transferred. And this happens with all the remaining files in memory. I think the transfer is resumed anyway but it is going work already only for new files in the remote directory.

I suggest you to add ExpressionEvaluatingRequestHandlerAdvice to the targetHandler definition (@ServiceActivator(adviceChain)) and in case of error call the AcceptOnceFileListFilter.remove(File):

/**
 * Remove the specified file from the filter so it will pass on the next attempt.
 * @param f the element to remove.
 * @return true if the file was removed as a result of this call.
 */
boolean remove(F f);

This way you remove the failed files from the filter and it will be picked up on the next poll task. You have to make AcceptOnceFileListFilter to be able to get an access to it from the onFailureExpression. The file is the payload of request message.

EDIT

The sample for the ExpressionEvaluatingRequestHandlerAdvice:

@Bean
public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnFailureExpressionString("@acceptOnceFileListFilter.remove(payload)");
    advice.setTrapException(true);
    return advice;
}

...

@ServiceActivator(inputChannel = "ftpChannel", adviceChain = "expressionAdvice")

Everything rest you can get from their JavaDocs.

Upvotes: 1

Related Questions