Reputation: 1
I've configured tcp call in spring integration with following flow: Gateway(Future call)====>Splitter(with executor)====>router(to 2 different transformers)===>Outboud-gateway===>Aggregator====>service
Aggregator is configured on receiving the reply because need to make another tcp call based on particular value received from first call, then send to service class.
I am getting problem after certain period of execution (records properly getting persisted) when everything halts, executor active count reaches max pool size and executor queue contains queued messages, and everything just halts forever and need to terminate the main process.
<int:gateway id="clientPositionsGateway" service-interface="com.example.ClientPositionsGateway" async-executor="syncExecutor">
<int:method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="clientPositionsResponseChannel"/>
<int:method name="getSecurityData" request-channel="securityDataRequestChannel" reply-channel="clientPositionsResponseChannel"/>
</int:gateway>
<int:channel id="clientPositionsRequestChannel" >
</int:channel>
<int:channel id="securityDataRequestChannel" >
</int:channel>
<int:splitter input-channel="clientPositionsRequestChannel"
output-channel="singleClientPositionsRequestChannel"
/>
<int:channel id="singleClientPositionsRequestChannel" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:recipient-list-router input-channel="singleClientPositionsRequestChannel" >
<int:recipient channel="singleClientCommQueryChannel" />
<int:recipient channel="singleClientTransQueryChannel" />
</int:recipient-list-router>
<int:transformer
input-channel="singleClientTransQueryChannel"
output-channel="transQueryHeaderEnricherRequestChannel"
ref="dmPOSBaseTransQueryTransformer" order="2"/>
<int:header-enricher id="transQueryHeaderEnricher" input-channel="transQueryHeaderEnricherRequestChannel" output-channel="dmQueryRequestChannel" >
<int:header name="transQueryHeader" value="TRANS_QUERY_HEADER"/>
</int:header-enricher>
<int:channel id="transQueryHeaderEnricherRequestChannel" >
</int:channel>
<int:transformer
input-channel="singleClientCommQueryChannel"
output-channel="dmQueryRequestChannel"
ref="dmPOSBaseCommQueryTransformer" order="1"/>
<int:transformer
input-channel="securityDataRequestChannel"
output-channel="secQueryHeaderEnricherRequestChannel"
ref="dmSECBaseQueryTransformer" />
<int:header-enricher id="secQueryHeaderEnricher" input-channel="secQueryHeaderEnricherRequestChannel" output-channel="dmQueryRequestChannel" >
<int:header name="secQueryHeader" value="SEC_QUERY_HEADER"/>
</int:header-enricher>
<int:channel id="singleClientCommQueryChannel" >
</int:channel>
<int:channel id="transformSecurityDataRequestChannel" />
<int:channel id="singleClientTransQueryChannel" >
</int:channel>
<int:channel id="dmQueryRequestChannel" >
</int:channel>
<int:channel id="secQueryHeaderEnricherRequestChannel" >
</int:channel>
<ip:tcp-outbound-gateway id="dmServerGateway"
request-channel="dmQueryRequestChannel"
reply-channel="dmQueryResponseChannel"
connection-factory="csClient"
reply-timeout="3600000" request-timeout="3600000"
/>
<int:aggregator input-channel="dmQueryResponseChannel"
method="aggregateClientPositions"
ref="clientPositionsAggregator"
output-channel="aggregateDataResponseChannel"
correlation-strategy-expression="headers[id]"
release-strategy-expression="size() == 1"
send-partial-result-on-expiry="true" />
<int:service-activator method="createClientPosition" input-channel="aggregateDataResponseChannel" output-channel="clientPositionsResponseChannel" ref="clientPositionsService" >
</int:service-activator>
<int:channel id="dmQueryResponseChannel" >
</int:channel>
<int:channel id="securityDataResponseChannel" />
<int:channel id="aggregateDataResponseChannel" >
</int:channel>
<int:channel id="clientPositionsResponseChannel" >
Here are gateway interface methods:
Future> fetchClientPositions(List clientList);
List getSecurityData(String symbol);
Upvotes: 0
Views: 575
Reputation: 121427
Looks like your TCP service is bottleneck.
From other side I don't understand a bit your message flow:
splitter
, so your payload may be a List
recipient-list-router
and here duplicate each item after splittingtcp:outbound-gateway
, but one by one, because both are within direct channels.tcp:outbound-gateway
sends a reply to the aggregator
. But the last one has strange aggregation configuration: each group contains only one message by its id
, and completes as a list with one messageaggregator
sends reply (over service-activator
) exactly to the gateway's reply-channel
So, how about other splitted items? They will be lost, because your gateway already has got reply just after first message. All other work seems redundant...
Upvotes: 0