Marouane Gazanayi
Marouane Gazanayi

Reputation: 5183

FTP file not downloaded with Spring Integration after local deletion

We are writing a Batch Job which takes a file as input from an FTP, generates some new files and writes them to an S3 bucket, and for this we are using Spring Integration.

The file in the FTP is an extraction from a DB and is updated each night.

The problem is that, when we start the app the first time, it connects well to the FTP, downloads the file, and uploads the generation result S3. Then we delete the downloaded file locally and wait to the next generation of the file in the FTP to restart the process. But it never downloads the file again.

Any idea?

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlows
                .from(ftpReader(),
                        spec -> spec.id("ftpInboundAdapter")
                                .autoStartup(true)
                                .poller(Pollers.fixedDelay(period)))
                .enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
                .aggregate(aggregatorSpec -> aggregatorSpec
                        .correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
                        .releaseStrategy(group -> group.getMessages().size() == 2))
                .transform(stockUnmarshaller)
                .transform(stockTransformer)
                .transform(stockMarshaller)
                .transform(picturesDownloader)
                .transform(picturesZipper)
                .transform(stockIndexer)
                .handle(directoryCleaner)
                .nullChannel();
    }

    @Bean
    public FtpInboundChannelAdapterSpec ftpReader() {
        return Ftp.inboundAdapter(ftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(rootFolder)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory));
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
        sessionFactory.setHost(host);
        sessionFactory.setUsername(userName);
        sessionFactory.setPassword(password);
        sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        return sessionFactory;
    }

Thanks in advance.

EDIT:

I use enrichHeaders to ensure that the pipeline is triggered if we have exactly 2 files. Maybe the headers are not removed and the condition will be always greater than 2? Maybe it's the wrong manner to proceed?

Thanks again.

Upvotes: 0

Views: 349

Answers (2)

Artem Bilan
Artem Bilan

Reputation: 121282

Sounds like you talk about the same file. In this case deleting it from the local dir is not enough. There are some FileListFilter instances involved in the process which hold an entry for the processed file. And according to your configuration you deal with in-memory variants. They really know nothing about your local file removal.

To be precise there are two filters your need worry about: FtpPersistentAcceptOnceFileListFilter for a remote entry and FileSystemPersistentAcceptOnceFileListFilter for local copy of the file. Both of them are implementing ResettableFileListFilter, so, you can call their remove() whenever you done with file process.

The FtpInboundChannelAdapterSpec in Java DSL has these options:

/**
 * Configure a {@link FileListFilter} to be applied to the remote files before
 * copying them.
 * @param filter the filter.
 * @return the spec.
 */
public S filter(FileListFilter<F> filter) {

/**
 * A {@link FileListFilter} used to determine which files will generate messages
 * after they have been synchronized.
 * @param localFileListFilter the localFileListFilter.
 * @return the spec.
 * @see AbstractInboundFileSynchronizingMessageSource#setLocalFilter(FileListFilter)
 */
public S localFilter(FileListFilter<File> localFileListFilter) {

So, you still can have those mentioned filters as default, but you extract them as beans and inject into these options and into your directoryCleaner to perform removal from those filters as well.

There is also an option like:

/**
 * Switch the local {@link FileReadingMessageSource} to use its internal
 * {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
 * @param useWatchService the {@code boolean} flag to switch to
 * {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
 * @since 5.0
 */
public void setUseWatchService(boolean useWatchService) {

And DELETE event is configured for watcher as well. When it happens a removed file is also deleted from the local filter.

You may also deal properly with a remote file when you configure:

/**
 * Set to true to enable the preservation of the remote file timestamp when transferring.
 * @param preserveTimestamp true to preserve.
 * @return the spec.
 */
public S preserveTimestamp(boolean preserveTimestamp) {

This way a newer file with the same name will be treated as a different file and its entry in the mentioned filters will be overwritten. Although I see you use it already, but you still complain that it doesn't work. It might be the case with some old version of Spring Integration when FileSystemPersistentAcceptOnceFileListFilter was not used for local files.

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174574

The inbound channel adapter has two filters .filter and .localFilter.

The first filters the remote files before downloading, the second filters files on the file system.

By default the filter is a FtpPersistentAcceptOnceFileListFilter which will only fetch new or changed files.

By default, the localFilter is an FileSystemPersistentAcceptOnceFileListFilter which, again, will only pass a file a second time if it's timestamp has changed.

So the file will only be reprocessed if its timestamp changes.

I suggest you run in a debugger to see why it is not passing the filter.

Upvotes: 1

Related Questions