Reputation: 8467
Application background
There are 2 services in application. Lets call them
1) service A
2) service B
Service A basically copies files that match a particular criteria from the source directory to the destination directory. Using spring integration here. The destination directory for service A is the source directory of service B.
Service B is constantly polling the directory for files and processes them and then moves them to another subdirectory called "processed".
Problem:
The original problem was that while service A is copying files to the destination directory, service B picks up the half copied files and processes them.
Tried solution
Refer to the service B integration-context.xml
below. I attached a composite filter to the inbound channel. I added a custom filter called LastCreatedFileListFilter
to this composite filter. This filter is based on lines of LastModifiedFileListFilter
provided by spring-integration-file
and this basically discards any file whose age (by created time) is less than 30 seconds.
This filter works perfectly and does not pick a file until its 30 seconds old. But the problem now is that I am using prevent-duplicates="true"
. So what happens is the first time service B polls the folder and the age of the file is less than 30 seconds it filters out the file but after 30 seconds, the filter does not filter out the file which is correct behavior but by now the service has marked this as a duplicate and rejects it.
So, my problem here is that I do want to keep a prevent duplicate check and also not process a file until its fully copied.
I am not very familiar with spring-integration and the two approaches I am looking to realize is:
1) not mark files as duplicate in the above scenario until it is processed by the application? is this possible? is so, is this advised?
2) In service A if I could first create files with a temporary name and then rename them after the copy is complete? the thing here is that service B will not pick up the files until they start with a configured name. This way, I do not have to trust the age property which could not be 100% decisive as the source and destination are on a different network and copy times could be large (#MurphysLaw).
But the problem here is that, I am having trouble conceptualizing how to best achieve this solution with spring integration. Any guidance or suggestions?
Please refer to integration context for service A for current implementation. Let me know if any thing needs clarification.
Service A
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:application.properties"/>
<bean name="redisMetaDataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
<constructor-arg ref="redisConnectionFactory" />
</bean>
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="port" value="6379" />
</bean>
<int-file:inbound-channel-adapter id = "filesIn"
channel="fileChannel"
directory="file:${input.directory}"
filter="incomingCompositeFilter">
<int:poller id="fileInboudPoller" fixed-rate="${in.interval}" time-unit="SECONDS" />
</int-file:inbound-channel-adapter>
<bean id="incomingCompositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean id="acceptOnceFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg ref="redisMetaDataStore"/>
<constructor-arg value="*"/>
</bean>
<bean id="notOlderThanDateFilter" class="com.fexco.bgeadmin.file.filter.NotOlderThanDateFilter">
<constructor-arg value="${file.lastModified.ignoreBeforeDate}"/>
</bean>
<bean id="documentConfigFilter" class="com.fexco.bgeadmin.file.filter.DocumentConfigFilter">
</bean>
</list>
</constructor-arg></bean>
<int:channel id="fileChannel"/>
<int-file:outbound-channel-adapter id="save-as-file"
auto-create-directory="true"
channel="fileChannel"
directory="file:${output.directory}"/>
</beans>
Service B
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/batch-integration
http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="${app.file.source}"
auto-create-directory="true"
prevent-duplicates="true"
filter="incomingCompositeFilter">
<int:poller fixed-rate="5000"/>
</int-file:inbound-channel-adapter>
<bean id="incomingCompositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean id="fileNameFilter" class="org.springframework.integration.file.filters.RegexPatternFileListFilter">
<constructor-arg value=".*\.(xls|xlsx|csv)$" />
</bean>
<bean id="ageFilter" class="com.fexco.bgeadmin.integration.filter.LastCreatedFileListFilter">
<property name="age" value="30"/>
</bean>
</list>
</constructor-arg></bean>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel" method="toRequest">
<bean class="com.fexco.bgeadmin.integration.FileMessageToJobRequest"/>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
</beans>
Upvotes: 0
Views: 2589
Reputation: 174564
The best solution for such problems is #2 - for A not to write the file to B "in-place". Using the last modified time is unreliable.
This is the standard procedure for the <int:file-outbound-channel-adapter/>
- which uses a FileWritingMessageHandler
underneath. It has a property temporaryFileSuffix
, which is .writing
by default. The file is renamed after being copied.
Upvotes: 4