Reputation: 261
This is my scenario: I get every month a file to my.sftp.server.de
with event data as JSON. The file has always exactly the same name. I know when the file is provided.
The file then shall be fetched, processed, removed and backuped with another filename showing the timestamp of procession. This is done with a subscribed MessageHandler deleteLocalFileService
.
Here are my Spring Integration classes:
@Autowired
private Executor taskExecutor;
private ConcurrentMap<String, String> metadata = new ConcurrentHashMap<>();
@Bean
public SessionFactory<LsEntry> sftpSessionFactory(TransferChannel transferChannel) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
...
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow sftpInboundFlow(TransferContext context) {
LOG.debug("sftpInboundFlow");
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory(context.getChannel()))
.preserveTimestamp(context.isPreserveTimestamp())
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilename())
.deleteRemoteFiles(context.isRemoteRemove())
.autoCreateLocalDirectory(context.isAutoCreateLocalDir())
.localFilenameExpression(context.getLocalFilename())
.localDirectory(new File(context.getLocalDir()))
.localFilter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(metadata), "")),
e -> e.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(15000).taskExecutor(taskExecutor))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpInputChannel")
.get();
}
@Bean
public PublishSubscribeChannel sftpInputChannel() {
LOG.debug("sftpInputChannel");
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
LOG.debug("MessageHandler-handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.debug("handleMessage message={}", message);
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
LOG.debug("MessageHandler-deleteLocalFileService");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
File f1 = (File) message.getHeaders().get("file_originalFile");
try {
// create backup copy in another directory
// (otherwise the remote and local filters avoid fetch of the (new) file again)
File f2 = new File("/home/app/sftp-inbound-work/EventData.txt.SAVE." +
LocalDateTime.now().format(ISO_LOCAL_DATE_TIME));
FileUtil.copyFile(f1, f2);
LOG.debug("deleteLocalFileService backup={} created", f2);
// remove received file
if (!f1.delete()) {
throw new MessagingException("Could not remove local file, " + f1.getName());
};
metadata.clear();
LOG.debug("deleteLocalFileService file={} removed", f1);
}
catch (IOException io) {
LOG.error("Exception copying local file {}", io);
throw new MessagingException("Could not backup local file, " + f1.getName());
}
}
};
}
@Bean
public MessageChannel controlChannel() {
LOG.debug("controlChannel");
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
LOG.debug("controlBus");
ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
return expressionControlBusFactoryBean;
}
Because I know when the event data file is provided from a external source, I have a method runAsTask
which is executed as Spring Scheduler Task. The method adds a ChannelInterceptor to get the content of the file, then starts the Spring Integration ensemble via a ControlChannel message and finally get and return the event data from the received file.
Here is the task method with its heart: the getFile()
method:
private static final int FILE_GET_TIMEOUT_SEC = 30;
@Autowired
AbstractMessageChannel sftpInputChannel;
@Autowired
MessageChannel controlChannel;
@Scheduled(cron = "${com.harry.potter.job.tasks.harry-potter-events-task.frequency}")
public void runAsTask() {
if (env.isEnabled()) {
LOG.info("runAsTask: Storing Event Data, frequency={}", env.getFrequency());
jobLOG.info("---------------------------------------------------------------");
jobLOG.info("Job started: Storing Event Data");
} else {
LOG.debug("runAsTask");
LOG.debug("runAsTask {} not enabled: Storing Event Data, aborted.", jobId);
return;
}
try {
events = getFile();
/* processing of the event data */
jobLOG.info("Okay, send okay mail");
sendOkayMail();
} catch (Exception e) {
LOG.error("{} failed! Exception: {}", jobId, e);
jobLOG.info("Job has errors, send error mail");
sendErrorMail();
} finally {
LOG.info("{} finished", jobId);
jobLOG.info("Job finished");
}
}
public List<Event> getFile() {
LOG.debug("getFile");
final List<Event> events = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
sftpInputChannel.addInterceptor(new ChannelInterceptor() {
// capture the message and count down the latch
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOG.debug("postSend channel={}, sent={}", channel.toString(), sent);
LOG.trace("postSend message={}", message.getPayload());
// read and transform the content
Arrays.stream((Event[]) message.getPayload()).forEach(e -> events.add(e));
// signal: work has done
latch.countDown();
ChannelInterceptor.super.postSend(message, channel, sent);
}
});
boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
LOG.debug("getFile 'Start' control message sent successfully={}", sent);
try {
LOG.debug("getFile, waiting for the file to be received, timeout={}s", FILE_GET_TIMEOUT_SEC);
if (latch.await(FILE_GET_TIMEOUT_SEC, TimeUnit.SECONDS)) {
return events;
}
}
catch (InterruptedException e) {
LOG.warn("getFile, job was interrupted");
}
throw new IllegalStateException("expected file not available");
}
There are two problems:
The ChannelInterceptor
is added multiple times (see in the LOG postSend channel=bean 'sftpInputChannel' ...
). Where should I add the interceptor to just have one? Or how can I check if an interceptor already was added?
The file is fetched from SFTP just the first time (see the com.jcraft.jsch
lines in the LOG). All subsequent task cycles no more negociation with the SFTP host occured. To fetch the file every cycle I was told to use a SimpleMetadataStore
for the LocalDirectoryFilter
backed by a Map and clear this map after processing the file, which I do. What do I have to change to have the file fetched from SFTP each time the task is running?
Here is the LOG having started the task for two times:
2021-10-09 01:35:52.688 INFO [] --- [http-nio-8080-exec-220] c.h.potter.job.MyJobApplication : The following profiles are active: test,cdsclient
2021-10-09 01:35:53.960 WARN [] --- [http-nio-8080-exec-220] o.s.boot.actuate.endpoint.EndpointId : Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2021-10-09 01:35:54.141 INFO [] --- [http-nio-8080-exec-220] o.s.cloud.context.scope.GenericScope : BeanFactory id=301a6a9f-3809-32e6-b15b-e640dd198297
2021-10-09 01:35:54.149 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-10-09 01:35:54.155 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-10-09 01:35:54.160 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-10-09 01:35:54.282 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.300 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.301 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.363 INFO [] --- [http-nio-8080-exec-220] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1668 ms
2021-10-09 01:35:54.967 WARN [] --- [http-nio-8080-exec-220] c.n.c.sources.URLConfigurationSource : No URLs will be polled as dynamic configuration sources.
2021-10-09 01:35:54.967 INFO [] --- [http-nio-8080-exec-220] c.n.c.sources.URLConfigurationSource : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-10-09 01:35:54.981 INFO [] --- [http-nio-8080-exec-220] c.netflix.config.DynamicPropertyFactory : DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@db52778f
2021-10-09 01:35:55.158 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpInputChannel
2021-10-09 01:35:55.224 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : controlChannel
2021-10-09 01:35:55.476 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : transferChannel
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : transferContext
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpSessionFactory
2021-10-09 01:35:55.486 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpInboundFlow
2021-10-09 01:35:55.587 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : MessageHandler-handler
2021-10-09 01:35:55.588 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : MessageHandler-deleteLocalFileService
2021-10-09 01:35:55.589 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : controlBus
2021-10-09 01:35:56.271 INFO [] --- [http-nio-8080-exec-220] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.integration.channel.DirectChannel : Channel 'application-1.sftpInboundFlow.channel#0' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:35:56.344 INFO [] --- [http-nio-8080-exec-220] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2021-10-09 01:35:56.380 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1
2021-10-09 01:35:56.449 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson
2021-10-09 01:35:56.449 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson
2021-10-09 01:35:56.550 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml
2021-10-09 01:35:56.550 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml
2021-10-09 01:35:56.722 INFO [] --- [http-nio-8080-exec-220] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Disable delta property : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Application is null : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Application version is -1: true
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server
2021-10-09 01:35:56.953 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : The response status is 200
2021-10-09 01:35:56.955 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Not registering with Eureka server per configuration
2021-10-09 01:35:56.958 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1633736156957 with initial instances count: 69
2021-10-09 01:35:56.959 INFO [] --- [http-nio-8080-exec-220] o.s.c.n.e.s.EurekaServiceRegistry : Registering application MyJob with eureka with status UP
2021-10-09 01:35:56.986 INFO [] --- [http-nio-8080-exec-220] c.h.potter.job.MyJobApplication : Started MyJobApplication in 5.36 seconds (JVM running for 996008.867)
2021-10-09 01:36:00.000 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:00.001 INFO [] --- [pool-240-thread-1] job-MyJob : ---------------------------------------------------------------
2021-10-09 01:36:00.001 INFO [] --- [pool-240-thread-1] job-MyJob : Job started: Storing Event Data
2021-10-09 01:36:00.001 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile
2021-10-09 01:36:00.031 INFO [] --- [pool-240-thread-1] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:00.055 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Connecting to my.sftp.server.de port 22
2021-10-09 01:36:00.057 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Connection established
...
2021-10-09 01:36:00.158 INFO [] --- [pool-240-thread-2] o.s.i.s.s.DefaultSftpSessionFactory : The authenticity of host 'my.sftp.server.de' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-10-09 01:36:00.158 WARN [] --- [pool-240-thread-2] com.jcraft.jsch : Permanently added 'my.sftp.server.de' (RSA) to the list of known hosts.
...
2021-10-09 01:36:00.385 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Authentication succeeded (publickey).
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.c.c.sftpConfiguration : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@24b4484b, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=fa7b1773-d522-9f0f-e8ed-8fcf3bebe23a, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736160447}]
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.c.c.sftpConfiguration : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:00.448 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : createEvents
2021-10-09 01:36:01.384 INFO [] --- [pool-240-thread-1] job-MyJob : Okay, send okay mail
2021-10-09 01:36:01.384 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : sendOkayMail
2021-10-09 01:36:01.512 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : MyJob finished
2021-10-09 01:36:01.512 INFO [] --- [pool-240-thread-1] job-MyJob : Job finished
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] job-MyJob : ---------------------------------------------------------------
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] job-MyJob : Job started: Storing Event Data
2021-10-09 01:36:30.001 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.c.c.sftpConfiguration : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@b2cd829f, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=e6558e48-db84-fd2f-2195-c43d3e245127, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736190062}]
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.c.c.sftpConfiguration : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : createEvents
2021-10-09 01:36:30.394 INFO [] --- [pool-240-thread-1] job-MyJob : Okay, send okay mail
2021-10-09 01:36:30.395 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : sendOkayMail
2021-10-09 01:36:30.457 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : MyJob finished
2021-10-09 01:36:30.457 INFO [] --- [pool-240-thread-1] job-MyJob : Job finished
...
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.sftpInboundFlow.channel#0' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
Upvotes: 0
Views: 671
Reputation: 174544
Since you need a count down latch, you need a new interceptor each time (or use an AtomicReference
with a new latch each time). You can call removeInterceptor
.
There is also a remote filter.
However, for scenarios like this, it's much easier to use an outbound gateway with a GET command rather than using the async inbound channel adapter.
EDIT
Example:
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
sf.setPassword(pw);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.from(Gate.class)
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class))
.get();
}
}
interface Gate {
Foo getFoo(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
}
}
EDIT2
Example using .route
for fetch and delete:
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw, @Value("${pk}") Resource pk) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
// sf.setPassword(pw);
sf.setPrivateKey(pk);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return f -> f
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
.subFlowMapping("getFoo", f1 -> f1
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class)))
.subFlowMapping("remove", f2 -> f2
.handle(Sftp.outboundGateway(sf, Command.RM, "payload"))));
}
}
@MessagingGateway(defaultRequestChannel = "flow.input",
defaultHeaders = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
interface Gate {
Foo getFoo(String filename);
boolean remove(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
log.info("Deleted {}", this.gate.remove("foo/bar.json"));
}
}
EDIT3
The above example defines the gateway proxy using an annotation only - you can't mix configuration between annotations and DSL. The following shows the same thing with the configuration of the gateway proxy using only the DSL.
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw, @Value("${pk}") Resource pk) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
// sf.setPassword(pw);
sf.setPrivateKey(pk);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.from(Gate.class, g -> g
.header("method", args -> args.getMethod().getName()))
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
.subFlowMapping("getFoo", f1 -> f1
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class)))
.subFlowMapping("remove", f2 -> f2
.handle(Sftp.outboundGateway(sf, Command.RM, "payload"))))
.get();
}
}
interface Gate {
Foo getFoo(String filename);
boolean remove(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
log.info("Deleted {}", this.gate.remove("foo/bar.json"));
}
}
Upvotes: 1