JBStonehenge
JBStonehenge

Reputation: 261

How do I force the SftpInboundAdapter to connect and get a file every time a task is executed?

This is my scenario: I get every month a file to my.sftp.server.de with event data as JSON. The file has always exactly the same name. I know when the file is provided.

The file then shall be fetched, processed, removed and backuped with another filename showing the timestamp of procession. This is done with a subscribed MessageHandler deleteLocalFileService.

Here are my Spring Integration classes:

@Autowired
private Executor taskExecutor;

private ConcurrentMap<String, String> metadata = new ConcurrentHashMap<>();

@Bean
public SessionFactory<LsEntry> sftpSessionFactory(TransferChannel transferChannel) {
    LOG.debug("sftpSessionFactory");
    DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
    ...
    return new CachingSessionFactory<LsEntry>(sf);
}

@Bean
public IntegrationFlow sftpInboundFlow(TransferContext context) {
    LOG.debug("sftpInboundFlow");
    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory(context.getChannel()))
                .preserveTimestamp(context.isPreserveTimestamp())
                .remoteDirectory(context.getRemoteDir())
                .regexFilter(context.getRemoteFilename())
                .deleteRemoteFiles(context.isRemoteRemove())
                .autoCreateLocalDirectory(context.isAutoCreateLocalDir())
                .localFilenameExpression(context.getLocalFilename())
                .localDirectory(new File(context.getLocalDir()))
                .localFilter(new FileSystemPersistentAcceptOnceFileListFilter(
                        new SimpleMetadataStore(metadata), "")),
            e -> e.id("sftpInboundAdapter")
                .autoStartup(false)
                .poller(Pollers.fixedDelay(15000).taskExecutor(taskExecutor))
            )
        .transform(Transformers.fromJson(Event[].class))
        .channel("sftpInputChannel")
        .get();
}

@Bean
public PublishSubscribeChannel sftpInputChannel() {
    LOG.debug("sftpInputChannel");
    return new PublishSubscribeChannel();
}

@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
    LOG.debug("MessageHandler-handler");
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            LOG.debug("handleMessage message={}", message);
        }
    };
}

@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
    LOG.debug("MessageHandler-deleteLocalFileService");
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            File f1 = (File) message.getHeaders().get("file_originalFile");
            try {
                // create backup copy in another directory
                // (otherwise the remote and local filters avoid fetch of the (new) file again)
                File f2 = new File("/home/app/sftp-inbound-work/EventData.txt.SAVE." +
                        LocalDateTime.now().format(ISO_LOCAL_DATE_TIME));
                FileUtil.copyFile(f1, f2);
                LOG.debug("deleteLocalFileService backup={} created", f2);

                // remove received file
                if (!f1.delete()) {
                    throw new MessagingException("Could not remove local file, " + f1.getName());
                };
                metadata.clear();
                LOG.debug("deleteLocalFileService file={} removed", f1);
            }
            catch (IOException io) {
                LOG.error("Exception copying local file {}", io);
                throw new MessagingException("Could not backup local file, " + f1.getName());
            }
        }
    };
}

@Bean
public MessageChannel controlChannel() {
    LOG.debug("controlChannel");
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
    LOG.debug("controlBus");
    ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
    return expressionControlBusFactoryBean;
}

Because I know when the event data file is provided from a external source, I have a method runAsTask which is executed as Spring Scheduler Task. The method adds a ChannelInterceptor to get the content of the file, then starts the Spring Integration ensemble via a ControlChannel message and finally get and return the event data from the received file.

Here is the task method with its heart: the getFile() method:

private static final int FILE_GET_TIMEOUT_SEC = 30;

@Autowired
AbstractMessageChannel sftpInputChannel;

@Autowired
MessageChannel controlChannel;

@Scheduled(cron = "${com.harry.potter.job.tasks.harry-potter-events-task.frequency}")
public void runAsTask() {

    if (env.isEnabled()) {
        LOG.info("runAsTask: Storing Event Data, frequency={}", env.getFrequency());
        jobLOG.info("---------------------------------------------------------------");
        jobLOG.info("Job started: Storing Event Data");
    } else {
        LOG.debug("runAsTask");
        LOG.debug("runAsTask {} not enabled: Storing Event Data, aborted.", jobId);
        return;
    }

    try {
        events = getFile();

        /* processing of the event data */
        
        jobLOG.info("Okay, send okay mail");
        sendOkayMail();

    } catch (Exception e) {
        LOG.error("{} failed! Exception: {}", jobId, e);
        jobLOG.info("Job has errors, send error mail");
        sendErrorMail();
    } finally {
        LOG.info("{} finished", jobId);
        jobLOG.info("Job finished");
    }
}

public List<Event> getFile() {
    LOG.debug("getFile");

    final List<Event> events = new ArrayList<>();

    CountDownLatch latch = new CountDownLatch(1);

    sftpInputChannel.addInterceptor(new ChannelInterceptor() {
        // capture the message and count down the latch
        @Override
        public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
            LOG.debug("postSend channel={}, sent={}", channel.toString(), sent);
            LOG.trace("postSend message={}", message.getPayload());

            // read and transform the content
            Arrays.stream((Event[]) message.getPayload()).forEach(e -> events.add(e));

            // signal: work has done
            latch.countDown();
            ChannelInterceptor.super.postSend(message, channel, sent);
        }
        
    });

    boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
    LOG.debug("getFile 'Start' control message sent successfully={}", sent);

    try {
        LOG.debug("getFile, waiting for the file to be received, timeout={}s", FILE_GET_TIMEOUT_SEC);
        if (latch.await(FILE_GET_TIMEOUT_SEC, TimeUnit.SECONDS)) {
            return events;
        }
    }
    catch (InterruptedException e) {
        LOG.warn("getFile, job was interrupted");
    }

    throw new IllegalStateException("expected file not available");
}

There are two problems:

  1. The ChannelInterceptor is added multiple times (see in the LOG postSend channel=bean 'sftpInputChannel' ...). Where should I add the interceptor to just have one? Or how can I check if an interceptor already was added?

  2. The file is fetched from SFTP just the first time (see the com.jcraft.jsch lines in the LOG). All subsequent task cycles no more negociation with the SFTP host occured. To fetch the file every cycle I was told to use a SimpleMetadataStore for the LocalDirectoryFilter backed by a Map and clear this map after processing the file, which I do. What do I have to change to have the file fetched from SFTP each time the task is running?

Here is the LOG having started the task for two times:

2021-10-09 01:35:52.688  INFO [] --- [http-nio-8080-exec-220]  c.h.potter.job.MyJobApplication          : The following profiles are active: test,cdsclient
2021-10-09 01:35:53.960  WARN [] --- [http-nio-8080-exec-220]  o.s.boot.actuate.endpoint.EndpointId     : Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2021-10-09 01:35:54.141  INFO [] --- [http-nio-8080-exec-220]  o.s.cloud.context.scope.GenericScope     : BeanFactory id=301a6a9f-3809-32e6-b15b-e640dd198297
2021-10-09 01:35:54.149  INFO [] --- [http-nio-8080-exec-220]  faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-10-09 01:35:54.155  INFO [] --- [http-nio-8080-exec-220]  faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-10-09 01:35:54.160  INFO [] --- [http-nio-8080-exec-220]  faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-10-09 01:35:54.282  INFO [] --- [http-nio-8080-exec-220]  trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.300  INFO [] --- [http-nio-8080-exec-220]  trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.301  INFO [] --- [http-nio-8080-exec-220]  trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.363  INFO [] --- [http-nio-8080-exec-220]  w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1668 ms
2021-10-09 01:35:54.967  WARN [] --- [http-nio-8080-exec-220]  c.n.c.sources.URLConfigurationSource     : No URLs will be polled as dynamic configuration sources.
2021-10-09 01:35:54.967  INFO [] --- [http-nio-8080-exec-220]  c.n.c.sources.URLConfigurationSource     : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-10-09 01:35:54.981  INFO [] --- [http-nio-8080-exec-220]  c.netflix.config.DynamicPropertyFactory  : DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@db52778f
2021-10-09 01:35:55.158 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : sftpInputChannel
2021-10-09 01:35:55.224 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : controlChannel
2021-10-09 01:35:55.476 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : transferChannel
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : transferContext
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : sftpSessionFactory
2021-10-09 01:35:55.486 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : sftpInboundFlow
2021-10-09 01:35:55.587 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : MessageHandler-handler
2021-10-09 01:35:55.588 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : MessageHandler-deleteLocalFileService
2021-10-09 01:35:55.589 DEBUG [] --- [http-nio-8080-exec-220]  c.l.c.c.c.sftpConfiguration              : controlBus
2021-10-09 01:35:56.271  INFO [] --- [http-nio-8080-exec-220]  o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.integration.channel.DirectChannel    : Channel 'application-1.sftpInboundFlow.channel#0' has 1 subscriber(s).
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335  INFO [] --- [http-nio-8080-exec-220]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : Adding {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-10-09 01:35:56.336  INFO [] --- [http-nio-8080-exec-220]  o.s.i.endpoint.EventDrivenConsumer       : started bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:35:56.344  INFO [] --- [http-nio-8080-exec-220]  o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING
2021-10-09 01:35:56.380  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Initializing Eureka in region us-east-1
2021-10-09 01:35:56.449  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson
2021-10-09 01:35:56.449  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson
2021-10-09 01:35:56.550  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml
2021-10-09 01:35:56.550  INFO [] --- [http-nio-8080-exec-220]  c.n.d.provider.DiscoveryJerseyProvider   : Using XML decoding codec XStreamXml
2021-10-09 01:35:56.722  INFO [] --- [http-nio-8080-exec-220]  c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Disable delta property : false
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Single vip registry refresh property : null
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Force full registry fetch : false
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Application is null : false
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Registered Applications size is zero : true
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Application version is -1: true
2021-10-09 01:35:56.743  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Getting all instance registry info from the eureka server
2021-10-09 01:35:56.953  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : The response status is 200
2021-10-09 01:35:56.955  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Not registering with Eureka server per configuration
2021-10-09 01:35:56.958  INFO [] --- [http-nio-8080-exec-220]  com.netflix.discovery.DiscoveryClient    : Discovery Client initialized at timestamp 1633736156957 with initial instances count: 69
2021-10-09 01:35:56.959  INFO [] --- [http-nio-8080-exec-220]  o.s.c.n.e.s.EurekaServiceRegistry        : Registering application MyJob with eureka with status UP
2021-10-09 01:35:56.986  INFO [] --- [http-nio-8080-exec-220]  c.h.potter.job.MyJobApplication          : Started MyJobApplication in 5.36 seconds (JVM running for 996008.867)
2021-10-09 01:36:00.000  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:00.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : ---------------------------------------------------------------
2021-10-09 01:36:00.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job started: Storing Event Data
2021-10-09 01:36:00.001 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile
2021-10-09 01:36:00.031  INFO [] --- [pool-240-thread-1]  o.s.i.e.SourcePollingChannelAdapter      : started bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:00.055  INFO [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Connecting to my.sftp.server.de     port 22
2021-10-09 01:36:00.057  INFO [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Connection established
...
2021-10-09 01:36:00.158  INFO [] --- [pool-240-thread-2]  o.s.i.s.s.DefaultSftpSessionFactory      : The authenticity of host 'my.sftp.server.de' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-10-09 01:36:00.158  WARN [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Permanently added 'my.sftp.server.de' (RSA) to the list of known hosts.
...
2021-10-09 01:36:00.385  INFO [] --- [pool-240-thread-2]  com.jcraft.jsch                          : Authentication succeeded (publickey).
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2]  c.l.c.c.c.sftpConfiguration              : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@24b4484b, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=fa7b1773-d522-9f0f-e8ed-8fcf3bebe23a, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736160447}]
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2]  c.l.c.c.c.sftpConfiguration              : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2]  c.l.c.job.MyJobTask                      : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:00.448 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : createEvents
2021-10-09 01:36:01.384  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Okay, send okay mail
2021-10-09 01:36:01.384 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : sendOkayMail
2021-10-09 01:36:01.512  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : MyJob finished
2021-10-09 01:36:01.512  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job finished
2021-10-09 01:36:30.001  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:30.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : ---------------------------------------------------------------
2021-10-09 01:36:30.001  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job started: Storing Event Data
2021-10-09 01:36:30.001 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.c.c.sftpConfiguration              : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@b2cd829f, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=e6558e48-db84-fd2f-2195-c43d3e245127, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736190062}]
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.c.c.sftpConfiguration              : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.job.MyJobTask                      : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3]  c.l.c.job.MyJobTask                      : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : createEvents
2021-10-09 01:36:30.394  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Okay, send okay mail
2021-10-09 01:36:30.395 DEBUG [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : sendOkayMail
2021-10-09 01:36:30.457  INFO [] --- [pool-240-thread-1]  c.l.c.job.MyJobTask                      : MyJob finished
2021-10-09 01:36:30.457  INFO [] --- [pool-240-thread-1]  job-MyJob                                : Job finished
...
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.sftpInboundFlow.channel#0' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : Removing {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.integration.channel.DirectChannel    : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:38:43.697  INFO [] --- [Catalina-utility-2]  o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'

Upvotes: 0

Views: 671

Answers (1)

Gary Russell
Gary Russell

Reputation: 174544

  1. Since you need a count down latch, you need a new interceptor each time (or use an AtomicReference with a new latch each time). You can call removeInterceptor.

  2. There is also a remote filter.

However, for scenarios like this, it's much easier to use an outbound gateway with a GET command rather than using the async inbound channel adapter.

https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-outbound-gateway

EDIT

Example:

@SpringBootApplication
@EnableScheduling
public class So69511643Application {

    public static void main(String[] args) {
        SpringApplication.run(So69511643Application.class, args);
    }

    @Bean
    DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
            @Value("${pw}") String pw) {

        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setUser(user);
        sf.setPassword(pw);
        sf.setAllowUnknownKeys(true);
        return sf;
    }

    @Bean
    IntegrationFlow flow(DefaultSftpSessionFactory sf) {
        return IntegrationFlows.from(Gate.class)
                .handle(Sftp.outboundGateway(sf, Command.GET, "payload")
                        .localDirectoryExpression("'/tmp'"))
                .transform(new FileToStringTransformer())
                .transform(Transformers.fromJson(Foo.class))
                .get();
    }

}

interface Gate {

    Foo getFoo(String filename);

}

@Component
@DependsOn("flow")
class Scheduler {

    private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);

    @Autowired
    Gate gate;

    @Scheduled(cron = "0 * * * * *")
    public void sched() {
        log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
        new File("/tmp/bar.json").delete();
    }

}

EDIT2

Example using .route for fetch and delete:

@SpringBootApplication
@EnableScheduling
public class So69511643Application {

    public static void main(String[] args) {
        SpringApplication.run(So69511643Application.class, args);
    }

    @Bean
    DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
            @Value("${pw}") String pw, @Value("${pk}") Resource pk) {

        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setUser(user);
//      sf.setPassword(pw);
        sf.setPrivateKey(pk);
        sf.setAllowUnknownKeys(true);
        return sf;
    }

    @Bean
    IntegrationFlow flow(DefaultSftpSessionFactory sf) {
        return f -> f
                .log()
                .route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
                        .subFlowMapping("getFoo", f1 -> f1
                            .handle(Sftp.outboundGateway(sf, Command.GET, "payload")
                                    .localDirectoryExpression("'/tmp'"))
                            .transform(new FileToStringTransformer())
                            .transform(Transformers.fromJson(Foo.class)))
                        .subFlowMapping("remove", f2 -> f2
                                .handle(Sftp.outboundGateway(sf, Command.RM, "payload"))));
    }

}

@MessagingGateway(defaultRequestChannel = "flow.input",
        defaultHeaders = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
interface Gate {

    Foo getFoo(String filename);

    boolean remove(String filename);

}

@Component
@DependsOn("flow")
class Scheduler {

    private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);

    @Autowired
    Gate gate;

    @Scheduled(cron = "0 * * * * *")
    public void sched() {
        log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
        new File("/tmp/bar.json").delete();
        log.info("Deleted {}", this.gate.remove("foo/bar.json"));
    }

}

EDIT3

The above example defines the gateway proxy using an annotation only - you can't mix configuration between annotations and DSL. The following shows the same thing with the configuration of the gateway proxy using only the DSL.

@SpringBootApplication
@EnableScheduling
public class So69511643Application {

    public static void main(String[] args) {
        SpringApplication.run(So69511643Application.class, args);
    }

    @Bean
    DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
            @Value("${pw}") String pw, @Value("${pk}") Resource pk) {

        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setUser(user);
//      sf.setPassword(pw);
        sf.setPrivateKey(pk);
        sf.setAllowUnknownKeys(true);
        return sf;
    }

    @Bean
    IntegrationFlow flow(DefaultSftpSessionFactory sf) {
        return IntegrationFlows.from(Gate.class, g -> g
                        .header("method", args -> args.getMethod().getName()))
                .log()
                .route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
                        .subFlowMapping("getFoo", f1 -> f1
                            .handle(Sftp.outboundGateway(sf, Command.GET, "payload")
                                    .localDirectoryExpression("'/tmp'"))
                            .transform(new FileToStringTransformer())
                            .transform(Transformers.fromJson(Foo.class)))
                        .subFlowMapping("remove", f2 -> f2
                                .handle(Sftp.outboundGateway(sf, Command.RM, "payload"))))
                .get();
    }

}

interface Gate {

    Foo getFoo(String filename);

    boolean remove(String filename);

}

@Component
@DependsOn("flow")
class Scheduler {

    private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);

    @Autowired
    Gate gate;

    @Scheduled(cron = "0 * * * * *")
    public void sched() {
        log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
        new File("/tmp/bar.json").delete();
        log.info("Deleted {}", this.gate.remove("foo/bar.json"));
    }

}

Upvotes: 1

Related Questions