Reputation: 43
I have a spring integration application, which I want to shutdown gracefully. The application runs within a docker container and the flow I want to shutdown transfers a decent amount of XML Files from one external system to another.
The requirements are: If the application has to shutdown, the current file transfer should complete and no further files should be touched after.
What I've learned and done so far: - docker stop sends a SIGTERM to the containers main process followed by a SIGKILL after 10 seconds (configurable with the --time=x option) - I implemented an ApplicationListener and registered it as @Bean, so it will be registered at the applications context. - the Flow uses a poller with a transactionManager, so the ApplicationListener can determine if the poller has an open transaction and if so, the Listener waits an amount of time.
My problem is now: With this solution I can wait until the current file transfer is finished, but I cannot tell the flow to stop reading inbound files. If the transfer completed and another file arrived while the ApplicationListener was waiting, the Flow will grab the file and start another transfer, which probably will abandon while the SIGKILL arrives. The injection of the flow as Lifecycle and the call to stop() don't seem to work as I thought.
My question is, is there a way to tell the Flow, that he should finish his work but should not listen on any arriving messages?
Here's my code so far: OutboundFlow:
@Bean
public PseudoTransactionManager transactionManager() {
return new PseudoTransactionManager();
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
final ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
processor.setBeanFactory(beanFactory);
return new DefaultTransactionSynchronizationFactory(processor);
}
@Bean
public PollerSpec orderOutboundFlowTempFileInPoller() {
return Pollers
.fixedDelay(pollerDelay)
.maxMessagesPerPoll(100)
.transactional(transactionManager())
.transactionSynchronizationFactory(transactionSynchronizationFactory());
}
@Bean
public IntegrationFlow orderOutboundFlowTempFileIn() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(temporaryPath + '/' + OrderUtils.SUBDIR_TMP_ORDER))
.filterFunction(
f -> OrderUtils.fileInputFilter(f, partnerConfigRepo, "orderOutboundFlowTempFileIn")),
e -> e.poller(orderOutboundFlowTempFileInPoller())) ...
Application:
public static void main(final String[] args) throws Exception {
SpringApplication.run(App.class, args);
}
@Bean
public GracefulShutdown gracefulShutdown() {
return new GracefulShutdown();
}
private static class GracefulShutdown implements ApplicationListener<ContextClosedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(GracefulShutdown.class);
@Autowired
private Lifecycle orderOutboundFlowTempFileIn;
@Override public void onApplicationEvent(ContextClosedEvent event) {
LOG.info("Trying to gracefully shutdown App");
ApplicationContext context = event.getApplicationContext();
PollerSpec outboundFlowTempFileInPoller = context.getBean(PollerSpec.class, "orderOutboundFlowTempFileInPoller");
orderOutboundFlowTempFileIn.stop();
TransactionInterceptor transactionManager = (TransactionInterceptor) (outboundFlowTempFileInPoller.get()
.getAdviceChain()
.iterator().next());
if (transactionManager.getTransactionManager() instanceof AbstractPlatformTransactionManager) {
final TransactionStatus transaction = transactionManager.getTransactionManager().getTransaction(null);
LOG.info("This is the transaction: " + transaction.toString() + ", isActive? " + !transaction.isCompleted());
while (!transaction.isCompleted()) {
try {
LOG.info("Still active, waiting 30 more seconds");
Thread.sleep(30000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
LOG.info("Transaction completed");
}
}
}
Upvotes: 1
Views: 1668
Reputation: 121550
You actually have just to stop a SourcePollingChannelAdapter
for the Files.inboundAdapter()
. For that purpose you need to add an .id()
to the e
lambda.
And use that id to retrieve a SourcePollingChannelAdapter
when you need to stop it.
This way you stop to receive new files immediately and those on-the-fly are going to be finished properly.
There is no reason to stop the whole flow from here, since all downstream components are passive.
Upvotes: 1