Bojan Vukasovic
Bojan Vukasovic

Reputation: 2268

FtpStreamingMessageSource - retry on failure

I have configured spring integration with this beans:

private static final Pattern FILE_PATTERN = Pattern.compile("<pattern>");

@Bean
public SessionFactory<FTPFile> ftpSessionFactory(){
    DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
    factory.setHost("localhost");
    factory.setPort(21);
    factory.setUsername("root");
    factory.setPassword("123456");
    factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
    return new CachingSessionFactory<>(factory);
}

@Bean
public ConcurrentMetadataStore metadataStore(){
    PropertiesPersistingMetadataStore  store = new PropertiesPersistingMetadataStore();
    store.setFileName("ftpStore.properties");
    return store;
}

@Bean(destroyMethod = "close")
public DataSource selectDataSource(){
    HikariDataSource dataSource = new HikariDataSource();
    dataSource.setJdbcUrl("jdbc:mysql://10.10.10.10:33306/csv");
    dataSource.setUsername("root");
    dataSource.setPassword("123456");
    return dataSource;
}

@Bean
public PlatformTransactionManager transactionManager(){
    return new DataSourceTransactionManager(selectDataSource());
}

@Bean
public TransactionSynchronizationFactory synchronizationFactory(){
    return new DefaultTransactionSynchronizationFactory(new TransactionSynchronizationProcessor() {
        @Override
        public void processBeforeCommit(IntegrationResourceHolder integrationResourceHolder) {
            int x = 22; //???
        }

        @Override
        public void processAfterCommit(IntegrationResourceHolder integrationResourceHolder) {
            int x = 22; //???
        }

        @Override
        public void processAfterRollback(IntegrationResourceHolder integrationResourceHolder) {
            int x = 22; //???
        }
    });
}

@Bean
public PollerMetadata pollerMetadata(PlatformTransactionManager transactionManager){
    PeriodicTrigger trigger = new PeriodicTrigger(5000);
    trigger.setFixedRate(true);

    MatchAlwaysTransactionAttributeSource source = new MatchAlwaysTransactionAttributeSource();
    source.setTransactionAttribute(new DefaultTransactionAttribute());
    TransactionInterceptor interceptor = new TransactionInterceptor(transactionManager, source);

    PollerMetadata metadata = new PollerMetadata();
    metadata.setTrigger(trigger);
    metadata.setTransactionSynchronizationFactory(synchronizationFactory());
    metadata.setAdviceChain(Collections.singletonList(interceptor));
    return metadata;
}

@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller("pollerMetadata"))
public MessageSource<InputStream> ftpMessageSource(){
    FtpStreamingMessageSource source = new FtpStreamingMessageSource(new FtpRemoteFileTemplate(ftpSessionFactory()));
    source.setRemoteDirectory("ftp/folder");
    source.setFilter(new CompositeFileListFilter<>(Arrays.asList(
            new FtpRegexPatternFileListFilter(FILE_PATTERN),
            acceptOnceFileListFilter()
    )));
    return source;
}

@Bean
public FtpPersistentAcceptOnceFileListFilter acceptOnceFileListFilter(){
    FtpPersistentAcceptOnceFileListFilter filter = new FtpPersistentAcceptOnceFileListFilter(metadataStore(), "remote");
    filter.setFlushOnUpdate(true);
    return filter;
}

@Bean
@ServiceActivator(inputChannel = "newChannel")
public MessageHandler handler(){
    return new MessageHandler(){
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
            throw new MessagingException("error");
        }
    };
}

@Bean
public MessageChannel ftpChannel(){
    return new DirectChannel();
}

@Bean
public MessageChannel newChannel(){
    return new DirectChannel();
}

@Bean
public MessageChannel strChannel(){
    return new DirectChannel();
}

@Bean
@Transformer(inputChannel = "ftpChannel", outputChannel = "strChannel")
public org.springframework.integration.transformer.Transformer transformer2(){
    return new StreamTransformer("UTF-8");
}

@Bean
@Transformer(inputChannel = "strChannel", outputChannel = "newChannel")
public UnmarshallingTransformer transformer(){
    UnmarshallingTransformer transformer = new UnmarshallingTransformer(unmarshaller());
    return transformer;
}

@Bean
public Jaxb2Marshaller unmarshaller(){
    Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
    unmarshaller.setContextPath("com.generated.xsd");
    return unmarshaller;
}

My problem is that on throw new MessagingException("error");, all ftp files are saved to ftpStore.properties and on next reload (eg. if JVM fails), this files will not be processed ever again. How can I make sure that transaction is in place (that is if there is no exception file is saved to ftpStore.properties and otherwise not)? Is there some tutorial to follow in order to have failure-proof download of files from FTP Server?

Upvotes: 1

Views: 647

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121382

There is a ResettableFileListFilter abstraction on the matter do deal with.

And in fact your FtpPersistentAcceptOnceFileListFilter is a such one:

If, after synchronizing the files, an error occurs on the downstream flow processing a file, there is no automatic rollback of the filter so the failed file will not be reprocessed by default.

If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter. This will work for any ResettableFileListFilter.

The XML config sample is like this:

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="@acceptOnceFilter.remove(payload)" />
</int:transaction-synchronization-factory>

So, what you need is to adjust your synchronizationFactory with an appropriate DefaultTransactionSynchronizationFactory and ExpressionEvaluatingTransactionSynchronizationProcessor, respectively.

See Recovering from Failures.

Upvotes: 1

Related Questions