Reputation: 2268
I have configured spring integration with this beans:
private static final Pattern FILE_PATTERN = Pattern.compile("<pattern>");
@Bean
public SessionFactory<FTPFile> ftpSessionFactory(){
DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost("localhost");
factory.setPort(21);
factory.setUsername("root");
factory.setPassword("123456");
factory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return new CachingSessionFactory<>(factory);
}
@Bean
public ConcurrentMetadataStore metadataStore(){
PropertiesPersistingMetadataStore store = new PropertiesPersistingMetadataStore();
store.setFileName("ftpStore.properties");
return store;
}
@Bean(destroyMethod = "close")
public DataSource selectDataSource(){
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://10.10.10.10:33306/csv");
dataSource.setUsername("root");
dataSource.setPassword("123456");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager(){
return new DataSourceTransactionManager(selectDataSource());
}
@Bean
public TransactionSynchronizationFactory synchronizationFactory(){
return new DefaultTransactionSynchronizationFactory(new TransactionSynchronizationProcessor() {
@Override
public void processBeforeCommit(IntegrationResourceHolder integrationResourceHolder) {
int x = 22; //???
}
@Override
public void processAfterCommit(IntegrationResourceHolder integrationResourceHolder) {
int x = 22; //???
}
@Override
public void processAfterRollback(IntegrationResourceHolder integrationResourceHolder) {
int x = 22; //???
}
});
}
@Bean
public PollerMetadata pollerMetadata(PlatformTransactionManager transactionManager){
PeriodicTrigger trigger = new PeriodicTrigger(5000);
trigger.setFixedRate(true);
MatchAlwaysTransactionAttributeSource source = new MatchAlwaysTransactionAttributeSource();
source.setTransactionAttribute(new DefaultTransactionAttribute());
TransactionInterceptor interceptor = new TransactionInterceptor(transactionManager, source);
PollerMetadata metadata = new PollerMetadata();
metadata.setTrigger(trigger);
metadata.setTransactionSynchronizationFactory(synchronizationFactory());
metadata.setAdviceChain(Collections.singletonList(interceptor));
return metadata;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller("pollerMetadata"))
public MessageSource<InputStream> ftpMessageSource(){
FtpStreamingMessageSource source = new FtpStreamingMessageSource(new FtpRemoteFileTemplate(ftpSessionFactory()));
source.setRemoteDirectory("ftp/folder");
source.setFilter(new CompositeFileListFilter<>(Arrays.asList(
new FtpRegexPatternFileListFilter(FILE_PATTERN),
acceptOnceFileListFilter()
)));
return source;
}
@Bean
public FtpPersistentAcceptOnceFileListFilter acceptOnceFileListFilter(){
FtpPersistentAcceptOnceFileListFilter filter = new FtpPersistentAcceptOnceFileListFilter(metadataStore(), "remote");
filter.setFlushOnUpdate(true);
return filter;
}
@Bean
@ServiceActivator(inputChannel = "newChannel")
public MessageHandler handler(){
return new MessageHandler(){
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
throw new MessagingException("error");
}
};
}
@Bean
public MessageChannel ftpChannel(){
return new DirectChannel();
}
@Bean
public MessageChannel newChannel(){
return new DirectChannel();
}
@Bean
public MessageChannel strChannel(){
return new DirectChannel();
}
@Bean
@Transformer(inputChannel = "ftpChannel", outputChannel = "strChannel")
public org.springframework.integration.transformer.Transformer transformer2(){
return new StreamTransformer("UTF-8");
}
@Bean
@Transformer(inputChannel = "strChannel", outputChannel = "newChannel")
public UnmarshallingTransformer transformer(){
UnmarshallingTransformer transformer = new UnmarshallingTransformer(unmarshaller());
return transformer;
}
@Bean
public Jaxb2Marshaller unmarshaller(){
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setContextPath("com.generated.xsd");
return unmarshaller;
}
My problem is that on throw new MessagingException("error");
, all ftp files are saved to ftpStore.properties
and on next reload (eg. if JVM fails), this files will not be processed ever again. How can I make sure that transaction is in place (that is if there is no exception file is saved to ftpStore.properties
and otherwise not)? Is there some tutorial to follow in order to have failure-proof download of files from FTP Server?
Upvotes: 1
Views: 647
Reputation: 121382
There is a ResettableFileListFilter
abstraction on the matter do deal with.
And in fact your FtpPersistentAcceptOnceFileListFilter
is a such one:
If, after synchronizing the files, an error occurs on the downstream flow processing a file, there is no automatic rollback of the filter so the failed file will not be reprocessed by default.
If you wish to reprocess such files after a failure, you can use configuration similar to the following to facilitate the removal of the failed file from the filter. This will work for any
ResettableFileListFilter
.
The XML config sample is like this:
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="@acceptOnceFilter.remove(payload)" />
</int:transaction-synchronization-factory>
So, what you need is to adjust your synchronizationFactory
with an appropriate DefaultTransactionSynchronizationFactory
and ExpressionEvaluatingTransactionSynchronizationProcessor
, respectively.
Upvotes: 1