Gaurav Rawat
Gaurav Rawat

Reputation: 1324

Strategy to refresh/update SessionFactory in spring integration

HI I am using spring integration extensively in my project and in the current case dynamically creating my ftp, sftp adapters using spring dynamic flow registration. Also to provide session-factories I create them dynamically based on persisted configuration for each unique connection .

This works great but sometimes there are situations when I need to modify an existing session config dynamically and in this case I do require the session factory to refresh with a new session config . This can happen due to changing creds dynamically.

To do the same I am looking for two approches

  1. remove the dynamic flows via flowcontext.remove(flowid). But this does not somehow kill the flow, I still see the old session factory and flow running.
  2. If there is a way to associate a running adapter with a new Sessionfactory dynamically this would also work . But still have not find a way to accomplish this .

Please help

UPDATE

my dynamic registration code below

 CachingSessionFactory<FTPFile> csf = cache.get(feed.getConnectionId());
    IntegrationFlow flow = IntegrationFlows
                .from(inboundAdapter(csf).preserveTimestamp(true)//
                      .remoteDirectory(feed.getRemoteDirectory())//
                      .regexFilter(feed.getRegexFilter())//
                      .deleteRemoteFiles(feed.getDeleteRemoteFiles())
                      .autoCreateLocalDirectory(feed.getAutoCreateLocalDirectory())
                      .localFilenameExpression(feed.getLocalFilenameExpression())//
                      .localFilter(localFileFilter)//
                      .localDirectory(new File(feed.getLocalDirectory())),
                      e -> e.id(inboundAdapter.get(feed.getId())).autoStartup(false)
                            .poller(Pollers//
                                  .cron(feed.getPollingFreq())//
                                  .maxMessagesPerPoll(1)//
                                  .advice(retryAdvice)))
                .enrichHeaders(s -> s.header(HEADER.feed.name(), feed))//
                .filter(selector)//
                .handle(fcHandler)//
                .handle(fileValidationHandler)//
                .channel(ftbSubscriber)//
                .get();

          this.flowContext.registration(flow).addBean(csf).//
                id(inboundFlow.get(feed.getId())).//
                autoStartup(false).register();

I am trying removing the same via

flowContext.remove(flowId);

on removing also the poller and adapter still look like they are active

java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:275)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:200)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:62)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy188.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

*POST Gary comments * changed the order of the chain and removing autostartup as defined in his example and now the polling adapter looks like getting removed .

changed order to match the one from Gary and remove autostartup from the flowcontext chain. Though looks like bug is still there if autstrtup is true .

   this.flowContext.registration(flow).//
        id(inboundFlow.get(feed.getId()))//
        .addBean(sessionFactory.get(feed.getId()), csf)//
        .register();

* researching more * The standardIntegrationFlow.start does start all the components inside the flow irrespective of the autostartup status . I guess we do need to check the isAutostartup for these as well and only start them if autostartup is True when starting the IntegrationFlow. existing code below of standardIF . I there a way to override this or does this need a PR or fix .

if (!this.running) {
            ListIterator<Object> iterator = this.integrationComponents.listIterator(this.integrationComponents.size());
            this.lifecycles.clear();
            while (iterator.hasPrevious()) {
                Object component = iterator.previous();
                if (component instanceof SmartLifecycle) {
                    this.lifecycles.add((SmartLifecycle) component);
                    ((SmartLifecycle) component).start();
                }
            }
            this.running = true;
        }

Upvotes: 1

Views: 2640

Answers (1)

Gary Russell
Gary Russell

Reputation: 174534

remove() should shut everything down. If you are using CachingSessionFactory we need to destroy() it, so it closes the cached sessions.

The flow will automatically destroy() the bean if you add it to the registration (using addBean()).

If you can edit your question to show your dynamic registration code, I can take a look.

EDIT

Everything works fine for me...

@SpringBootApplication
public class So43916317Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43916317Application.class, args).close();
    }

    @Autowired
    private IntegrationFlowContext context;

    @Override
    public void run(String... args) throws Exception {
        CSF csf = new CSF(sf());
        IntegrationFlow flow = IntegrationFlows.from(Ftp.inboundAdapter(csf)
                    .localDirectory(new File("/tmp/foo"))
                    .remoteDirectory("bar"), e -> e.poller(Pollers.fixedDelay(1_000)))
                .handle(System.out::println)
                .get();
        this.context.registration(flow)
            .id("foo")
            .addBean(csf)
            .register();
        Thread.sleep(10_000);
        System.out.println("removing flow");
        this.context.remove("foo");
        System.out.println("destroying csf");
        csf.destroy();
        Thread.sleep(10_000);
        System.out.println("exiting");
        Assert.state(csf.destroyCalled, "destroy not called");
    }

    @Bean
    public DefaultFtpSessionFactory sf() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("10.0.0.3");
        sf.setUsername("ftptest");
        sf.setPassword("ftptest");
        return sf;
    }

    public static class CSF extends CachingSessionFactory<FTPFile> {

        private boolean destroyCalled;

        public CSF(SessionFactory<FTPFile> sessionFactory) {
            super(sessionFactory);
        }

        @Override
        public void destroy() {
            this.destroyCalled = true;
            super.destroy();
        }

    }

}

log...

16:15:38.898 [task-scheduler-5] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:38.898 [task-scheduler-5] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
16:15:39.900 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@149a806 from pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.r.s.CachingSessionFactory - Releasing Session org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
removing flow
16:15:40.756 [main] INFO  o.s.i.e.SourcePollingChannelAdapter - stopped org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
16:15:40.757 [main] INFO  o.s.i.channel.DirectChannel - Channel 'application.foo.channel#0' has 0 subscriber(s).
16:15:40.757 [main] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:15:40.757 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean 'foo': [org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0.source, foo.channel#0, com.example.So43916317Application$$Lambda$12/962287291#0, org.springframework.integration.config.ConsumerEndpointFactoryBean#0, foocom.example.So43916317Application$CSF#0]
destroying csf
16:15:40.757 [main] DEBUG o.s.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@149a806 from the pool
exiting
16:15:50.761 [main] TRACE o.s.c.a.AnnotationConfigApplicationContext - Publishing event in org.springframework.context.annotation.AnnotationConfigApplicationContext@27c86f2d: org.springframework.boot.context.event.ApplicationReadyEvent[source=org.springframework.boot.SpringApplication@5c18016b]

As you can see, the polling stops after the remove() and the session is closed by the destroy().

EDIT2

If you have auto start turned off you have to start via the registration...

IntegrationFlowRegistration registration = this.context.registration(flow)
    .id("foo")
    .addBean(csf)
    .autoStartup(false)
    .register();
...
registration.start();

Upvotes: 1

Related Questions