Mukul Goel
Mukul Goel

Reputation: 8467

Spring integration; Copying files picked up by service before copy is completed

Application background

There are 2 services in application. Lets call them
1) service A
2) service B

Service A basically copies files that match a particular criteria from the source directory to the destination directory. Using spring integration here. The destination directory for service A is the source directory of service B.

Service B is constantly polling the directory for files and processes them and then moves them to another subdirectory called "processed".

Problem:
The original problem was that while service A is copying files to the destination directory, service B picks up the half copied files and processes them.

Tried solution
Refer to the service B integration-context.xml below. I attached a composite filter to the inbound channel. I added a custom filter called LastCreatedFileListFilter to this composite filter. This filter is based on lines of LastModifiedFileListFilter provided by spring-integration-file and this basically discards any file whose age (by created time) is less than 30 seconds.

This filter works perfectly and does not pick a file until its 30 seconds old. But the problem now is that I am using prevent-duplicates="true". So what happens is the first time service B polls the folder and the age of the file is less than 30 seconds it filters out the file but after 30 seconds, the filter does not filter out the file which is correct behavior but by now the service has marked this as a duplicate and rejects it.

So, my problem here is that I do want to keep a prevent duplicate check and also not process a file until its fully copied.

I am not very familiar with spring-integration and the two approaches I am looking to realize is:
1) not mark files as duplicate in the above scenario until it is processed by the application? is this possible? is so, is this advised?
2) In service A if I could first create files with a temporary name and then rename them after the copy is complete? the thing here is that service B will not pick up the files until they start with a configured name. This way, I do not have to trust the age property which could not be 100% decisive as the source and destination are on a different network and copy times could be large (#MurphysLaw). But the problem here is that, I am having trouble conceptualizing how to best achieve this solution with spring integration. Any guidance or suggestions?

Please refer to integration context for service A for current implementation. Let me know if any thing needs clarification.

Service A

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder location="classpath:application.properties"/>


    <bean name="redisMetaDataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
        <constructor-arg ref="redisConnectionFactory" />
    </bean>

    <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="port" value="6379" />
    </bean>

    <int-file:inbound-channel-adapter id = "filesIn"
                                      channel="fileChannel"
                                      directory="file:${input.directory}"
                                      filter="incomingCompositeFilter">

        <int:poller id="fileInboudPoller" fixed-rate="${in.interval}" time-unit="SECONDS"  />
    </int-file:inbound-channel-adapter>

    <bean id="incomingCompositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
        <constructor-arg>
            <list>
                <bean id="acceptOnceFilter" class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
                    <constructor-arg ref="redisMetaDataStore"/>
                    <constructor-arg value="*"/>
                </bean>
                <bean id="notOlderThanDateFilter" class="com.fexco.bgeadmin.file.filter.NotOlderThanDateFilter">
                    <constructor-arg value="${file.lastModified.ignoreBeforeDate}"/>
                </bean>
                <bean id="documentConfigFilter" class="com.fexco.bgeadmin.file.filter.DocumentConfigFilter">
                </bean>
            </list>
        </constructor-arg></bean>

    <int:channel id="fileChannel"/>

    <int-file:outbound-channel-adapter id="save-as-file"
                                       auto-create-directory="true"
                                       channel="fileChannel"
                                       directory="file:${output.directory}"/>
</beans>

Service B

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/integration
         http://www.springframework.org/schema/integration/spring-integration.xsd
         http://www.springframework.org/schema/integration/file
         http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
         http://www.springframework.org/schema/batch-integration
         http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    <int:channel id="inboundFileChannel"/>
    <int:channel id="outboundJobRequestChannel"/>
    <int:channel id="jobLaunchReplyChannel"/>

    <int-file:inbound-channel-adapter id="filePoller"
                                      channel="inboundFileChannel"
                                      directory="${app.file.source}"
                                      auto-create-directory="true"
                                      prevent-duplicates="true"
                                      filter="incomingCompositeFilter">
        <int:poller fixed-rate="5000"/>
    </int-file:inbound-channel-adapter>

    <bean id="incomingCompositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
        <constructor-arg>
            <list>
                <bean id="fileNameFilter" class="org.springframework.integration.file.filters.RegexPatternFileListFilter">
                    <constructor-arg value=".*\.(xls|xlsx|csv)$" />
                </bean>
                <bean id="ageFilter" class="com.fexco.bgeadmin.integration.filter.LastCreatedFileListFilter">
                    <property name="age" value="30"/>
                </bean>
            </list>
        </constructor-arg></bean>

    <int:transformer input-channel="inboundFileChannel"
                     output-channel="outboundJobRequestChannel" method="toRequest">
        <bean class="com.fexco.bgeadmin.integration.FileMessageToJobRequest"/>
    </int:transformer>

    <batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
                                     reply-channel="jobLaunchReplyChannel"/>

    <int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

</beans>

Upvotes: 0

Views: 2589

Answers (1)

Gary Russell
Gary Russell

Reputation: 174564

The best solution for such problems is #2 - for A not to write the file to B "in-place". Using the last modified time is unreliable.

This is the standard procedure for the <int:file-outbound-channel-adapter/> - which uses a FileWritingMessageHandler underneath. It has a property temporaryFileSuffix, which is .writing by default. The file is renamed after being copied.

Upvotes: 4

Related Questions