sa3ad
sa3ad

Reputation: 1

Execution process gets halted after certain period with executor still active and its queue count fixed

I've configured tcp call in spring integration with following flow: Gateway(Future call)====>Splitter(with executor)====>router(to 2 different transformers)===>Outboud-gateway===>Aggregator====>service

Aggregator is configured on receiving the reply because need to make another tcp call based on particular value received from first call, then send to service class.

I am getting problem after certain period of execution (records properly getting persisted) when everything halts, executor active count reaches max pool size and executor queue contains queued messages, and everything just halts forever and need to terminate the main process.

<int:gateway id="clientPositionsGateway" service-interface="com.example.ClientPositionsGateway" async-executor="syncExecutor">
    <int:method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="clientPositionsResponseChannel"/>
    <int:method name="getSecurityData" request-channel="securityDataRequestChannel" reply-channel="clientPositionsResponseChannel"/>
</int:gateway>

<int:channel id="clientPositionsRequestChannel" >

</int:channel>
<int:channel id="securityDataRequestChannel" >

</int:channel>

<int:splitter input-channel="clientPositionsRequestChannel" 
        output-channel="singleClientPositionsRequestChannel" 

         />

<int:channel id="singleClientPositionsRequestChannel" >
    <int:dispatcher task-executor="taskExecutor"/>
</int:channel>

<int:recipient-list-router input-channel="singleClientPositionsRequestChannel" >
    <int:recipient channel="singleClientCommQueryChannel" />
    <int:recipient channel="singleClientTransQueryChannel" />
</int:recipient-list-router>

<int:transformer
    input-channel="singleClientTransQueryChannel"
    output-channel="transQueryHeaderEnricherRequestChannel"
    ref="dmPOSBaseTransQueryTransformer" order="2"/>

<int:header-enricher id="transQueryHeaderEnricher" input-channel="transQueryHeaderEnricherRequestChannel" output-channel="dmQueryRequestChannel" >
     <int:header name="transQueryHeader" value="TRANS_QUERY_HEADER"/>
</int:header-enricher>

<int:channel id="transQueryHeaderEnricherRequestChannel" >

</int:channel>

<int:transformer
    input-channel="singleClientCommQueryChannel"
    output-channel="dmQueryRequestChannel"
    ref="dmPOSBaseCommQueryTransformer" order="1"/>

<int:transformer
    input-channel="securityDataRequestChannel"
    output-channel="secQueryHeaderEnricherRequestChannel"
    ref="dmSECBaseQueryTransformer" />

<int:header-enricher id="secQueryHeaderEnricher" input-channel="secQueryHeaderEnricherRequestChannel" output-channel="dmQueryRequestChannel" >
     <int:header name="secQueryHeader" value="SEC_QUERY_HEADER"/>
</int:header-enricher>        

<int:channel id="singleClientCommQueryChannel" >

</int:channel>
<int:channel id="transformSecurityDataRequestChannel" />
<int:channel id="singleClientTransQueryChannel" >

</int:channel>
<int:channel id="dmQueryRequestChannel" >

</int:channel>

<int:channel id="secQueryHeaderEnricherRequestChannel" >

</int:channel>

<ip:tcp-outbound-gateway id="dmServerGateway" 
       request-channel="dmQueryRequestChannel" 
       reply-channel="dmQueryResponseChannel"
       connection-factory="csClient" 
       reply-timeout="3600000" request-timeout="3600000"
       />

<int:aggregator input-channel="dmQueryResponseChannel"  
    method="aggregateClientPositions"
    ref="clientPositionsAggregator" 
    output-channel="aggregateDataResponseChannel"
    correlation-strategy-expression="headers[id]"
    release-strategy-expression="size() == 1"
    send-partial-result-on-expiry="true" />

<int:service-activator method="createClientPosition" input-channel="aggregateDataResponseChannel" output-channel="clientPositionsResponseChannel" ref="clientPositionsService" >
</int:service-activator>

<int:channel id="dmQueryResponseChannel" >

</int:channel>
<int:channel id="securityDataResponseChannel" />
<int:channel id="aggregateDataResponseChannel" >

</int:channel>

<int:channel id="clientPositionsResponseChannel" >

Here are gateway interface methods: Future> fetchClientPositions(List clientList);
List getSecurityData(String symbol);

Upvotes: 0

Views: 575

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121427

Looks like your TCP service is bottleneck.

From other side I don't understand a bit your message flow:

  1. You have splitter, so your payload may be a List
  2. you use recipient-list-router and here duplicate each item after splitting
  3. Both recipients end up on tcp:outbound-gateway, but one by one, because both are within direct channels.
  4. tcp:outbound-gateway sends a reply to the aggregator. But the last one has strange aggregation configuration: each group contains only one message by its id, and completes as a list with one message
  5. aggregator sends reply (over service-activator) exactly to the gateway's reply-channel

So, how about other splitted items? They will be lost, because your gateway already has got reply just after first message. All other work seems redundant...

Upvotes: 0

Related Questions