constantlearner
constantlearner

Reputation: 5247

File getting deleted before processing

I have spring xd module with rabbit transport which pulls files from s3 and split line by line and delete it after processing(ExpressionAdvice) .I have around 1 million messages(lines) in my file which is in s3.The file gets downloaded to xd container box and i checked md5sum and its same and has same lines . I see only 260k odd message are coming to output channel which is processor.I am loosing around 740 messages. sometimes it random once i see all messages like 1 million in my output channel and sometimes only 250k .I am measuring this using counter for my stream.File is downloaded but i feel its getting deleted before processing all records/lines in 10 seconds, my file size is around 700Mb.Please let me know if expression advice is deleting before processing.

 module.aws-s3-source.count=1 and module.aws-s3-source.concurrency=70
    stream1 as-s3-source |processor|sink

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int-aws="http://www.springframework.org/schema/integration/aws"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration/file
         http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
             http://www.springframework.org/schema/context
             http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/aws http://www.springframework.org/schema/integration/aws/spring-integration-aws-1.0.xsd">


    <context:property-placeholder location="classpath*:test-${region}.properties" />

    <int:poller fixed-delay="${fixedDelay}" default="true">
        <int:advice-chain>
            <ref bean="pollAdvise"/>

        </int:advice-chain>
    </int:poller>


    <bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
        <constructor-arg ref="healthCheckStrategy"/>

    </bean>



    <bean id="healthCheckStrategy" class="test.ServiceHealthCheckPollSkipStrategy">
        <property name="url" value="${url}"/>
        <property name="doHealthCheck" value="${doHealthCheck}"/>
        <property name="restTemplate" ref="restTemplate"/>

    </bean>

    <bean id="restTemplate"
          class="org.springframework.web.client.RestTemplate">
        <constructor-arg ref="requestFactory"/>

    </bean>


    <bean id="requestFactory"
          class="test.BatchClientHttpRequestFactory">
        <constructor-arg ref="verifier"/>

    </bean>

    <bean id="verifier"
          class="test.NullHostnameVerifier">

    </bean>


    <bean id="encryptedDatum" class="test.EncryptedSecuredDatum"/>




    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
        <property name="proxyHost" value="${proxyHost}"/>
        <property name="proxyPort" value="${proxyPort}"/>
        <property name="preemptiveBasicProxyAuth" value="false"/>
    </bean>

    <bean id="s3Operations" class="test.CustomC1AmazonS3Operations">

        <constructor-arg index="0" ref="clientConfiguration"/>
        <property name="awsEndpoint" value="s3.amazonaws.com"/>
        <property name="temporaryDirectory" value="${temporaryDirectory}"/>
        <property name="awsSecurityKey"  value=""/>
    </bean>


    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">

    </bean>

    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com"
                                        bucket="${bucket}"
                                        s3-operations="s3Operations"
                                        credentials-ref="credentials"
                                        file-name-wildcard="${fileNameWildcard}"
                                        remote-directory="${prefix}"
                                        channel="splitChannel"
                                        local-directory="${localDirectory}"
                                        accept-sub-folders="false"
                                        delete-source-files="true"
                                        archive-bucket="${archiveBucket}"
                                        archive-directory="${archiveDirectory}">
    </int-aws:s3-inbound-channel-adapter>

    <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8">

        <int-file:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
                <property name="onSuccessExpression" value="payload.delete()"/>
            </bean>
        </int-file:request-handler-advice-chain>

    </int-file:splitter>

    <int:channel-interceptor pattern="*" order="3">
        <bean class="org.springframework.integration.channel.interceptor.WireTap">
            <constructor-arg ref="loggingChannel" />
        </bean>
    </int:channel-interceptor>
    <int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="INFO"/>

    <int:channel id="output"/>

</beans>

Update 2 :

My stream is like below aws-s3-source|processor|http-client| processor> queue:testQueue

1)Now I split the stream like below:

 aws-s3-source> queue:s3Queue

I was able to read all my 1 million messages very fast. 2)Now i added one more stream like below i see issue again was s3 stops pulling file and message are lost everytime

 queue:s3Queue>processor|http-client| processor> queue:testQueue 

3)Observation is when i add http-client this issue happens again ,i.e. some message from input source is missing.

4)Now I split the file into 125 Mb 5 files instead of 660mb one file .ie 200 k records 5 files.I don't see the issue i get all my messages

I also see lot of messages clogging in queue before http-client . I am thinking is it something to do with memory or threading inside xd?

Upvotes: 1

Views: 268

Answers (1)

Gary Russell
Gary Russell

Reputation: 174564

Please let me know if expression advice is deleting before processing.

No; the advice is an around advice around the message handler; it can't execute (evaluate the expression), until the splitter has emitted all the lines.

Is it possible the file is pulled from s3 before it's completely written?

To debug this problem, I would suggest changing the advice to send the file to another subflow and do some analysis/logging there before deleting.

Upvotes: 2

Related Questions