Zach Z
Zach Z

Reputation: 25

Spring Integration Threads Parked at Service Activator

I am having issues with threads getting parked at my Service Activators that leads to files hanging in the SftpGatewayChannel when the pool is depleted. I think it is related to the SA's having a void return, which is correct because they are only incrementing metrics.

I was able to work around the issue by adding a default-reply-timeout to the SftpGateway, but this is not ideal since there is retry advice and I don't want the threads to timeout if there is a connection issue. I would like a solution that returns the threads to the pool after a successful upload and call to the "Success" Service Activator.

<task:executor id="Tasker" rejection-policy="CALLER_RUNS" pool-size="${MaxThreads}" />

<int:channel id="SftpGatewayChannel">
    <int:dispatcher task-executor="Tasker" />
</int:channel>

<int:service-activator id="SegmentStart" input-channel="SftpGatewayChannel" ref="SftpGateway" />

<int:gateway id="SftpGateway" default-request-channel="SftpOutboundChannel" error-channel="ErrorChannel" />

<int:channel id="SftpOutboundChannel" datatype="java.lang.String,java.io.File,byte[]" />

<int-sftp:outbound-channel-adapter id="SftpOutboundAdapter"
    session-factory="SftpCachingSessionFactory" channel="SftpOutboundChannel" charset="UTF-8" >
    <int-sftp:request-handler-advice-chain>
        <ref bean="exponentialRetryAdvice" />
        <bean id="SuccessAdvice" class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice" >
            <property name="successChannel" ref="SuccessChannel"/>
            <property name="onSuccessExpression" value="true"/>
        </bean>
    </int-sftp:request-handler-advice-chain>
</int-sftp:outbound-channel-adapter>

<int:channel id="ErrorChannel">
    <int:interceptors>
        <int:wire-tap channel="FailureChannel" />
    </int:interceptors>
</int:channel>

<int:channel id="AttemptChannel" />
<int:channel id="SuccessChannel" />
<int:channel id="FailureChannel" />

<int:service-activator id="AttemptMetrics" input-channel="AttemptChannel"
    expression="T(MetricsCounter).addAttempt()"  />
<int:service-activator id="SuccessMetrics" input-channel="SuccessChannel"
    expression="T(MetricsCounter).addSuccesses(inputMessage.Headers.messages.size())" />
<int:service-activator id="FailureMetrics" input-channel="FailureChannel"
    expression="T(MetricsCounter).addFailures(payload.getFailedMessage().Headers.messages.size())" />

Upvotes: 1

Views: 1006

Answers (1)

Gary Russell
Gary Russell

Reputation: 174809

Yes, gateways expect a reply by default. Instead of using the default RequestReplyExchanger you could use a service-interface method with a void return void process(Message<?> m).

Alternatively, as you have done, simply add default-reply-timeout="0" on your gateway and the thread will return immediately without waiting for a reply (that will never come).

... but this is not ideal ...

The reply timeout clock only starts when the thread returns to the gateway, so it will have no impact on the downstream flow.

Upvotes: 1

Related Questions