Satya
Satya

Reputation: 1

Cascaded aggregator not returning message

I am using spring integration to pull in different account details from external systems concurrently (using taskmanager) and aggregate it into one account object. For that, I will have to do two levels of splits of message and after fetching need to aggregate at two levels. I am using simple pojos for splitter, router and aggregator implementations. And I am implementing correlation strategy and using a simple message group but not implementing any release strategy. For some reason, my messages are getting lost at the second aggregator. When I debug, I can see the messages coming on to the second level aggregator from first level aggregators, but it is unable to pass those messages on to the output channel.

One more observation is that, if there is only one message, I am getting the output. But of any conditions which result in more than one message to get aggregated, I dont see any output and the threads are hung.

Any help appreciated!!

Here is the context definitions.

    <bean id="accountManager" class="<package>.AccountManager"/>

<int:gateway id="accountBuilder"
    service-interface="<package>.AccountBuilder" default-request-channel="accountRequest"  default-reply-channel="allAccounts"/>

<int:channel id="accountRequest"/>
<int:channel id="allAccounts"/>
<int:splitter input-channel="accountRequest" output-channel="accountRequests" ref="accountSplitter" method="split"/>

<int:channel id="accountRequests">
    <int:dispatcher task-executor="accountServiceTaskExecutor"/>
</int:channel>

<int:router input-channel="accountRequests" ref="accountRouter" method="routeAccountRequests">
    <int:mapping channel="retailRequest"/>
    <int:mapping channel="manualRequest"/>
</int:router>

<bean id="accountMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />

<bean id="searchResultMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="accountMessageStore" />
    <property name="timeout" value="2000" />
</bean>

<!-- **************************************************************************** -->
<!-- **************************** RETAIL ACCOUNTS ******************************* -->
<!-- **************************************************************************** -->

<int:channel id="retailRequest"/>

<int:splitter input-channel="retailRequest" output-channel="retailAccountRequests" ref="retailAccountSplitter" method="split"/>

<int:channel id="retailAccountRequests">
    <int:dispatcher task-executor="accountServiceTaskExecutor"/>
</int:channel>

<int:router input-channel="retailAccountRequests" ref="retailAccountRouter" method="routeRetailAccountRequests">
    <int:mapping channel="retailAccountDataRequest"/>
    <int:mapping channel="retailAccountDetailsRequest"/>
    <int:mapping channel="retailAccountPositionsRequest"/>
</int:router>

<int:channel id="retailAccountDataRequest"/>
<int:service-activator input-channel="retailAccountDataRequest" ref="retailAccountDataMapper"
                       method="getRetailAccountData" output-channel="aggregatedRetailAccounts"/>

<int:channel id="retailAccountDetailsRequest"/>
<int:service-activator input-channel="retailAccountDetailsRequest" ref="retailAccountDetailsMapper"
                       method="getRetailAccountDetails" output-channel="aggregatedRetailAccounts"/>

<int:channel id="retailAccountPositionsRequest"/>
<int:service-activator input-channel="retailAccountPositionsRequest" ref="retailAccountPositionsMapper"
                       method="getRetailAccountPositions" output-channel="aggregatedRetailAccounts"/>

<int:channel id="aggregatedRetailAccounts" />
<int:aggregator input-channel="aggregatedRetailAccounts"
    ref="retailAccountAggregator" method="aggregate" output-channel="aggregatedAccounts"
    message-store="accountMessageStore" expire-groups-upon-completion="true"/>

<!-- ************************** END RETAIL ACCOUNTS ***************************** -->

<!-- **************************************************************************** -->
<!-- **************************** MANUAL ACCOUNTS ******************************* -->
<!-- **************************************************************************** -->

<int:channel id="manualRequest"/>

<int:splitter input-channel="manualRequest" output-channel="manualAccountRequests" ref="manualAccountSplitter" method="split"/>

<int:channel id="manualAccountRequests">
    <int:dispatcher task-executor="accountServiceTaskExecutor"/>
</int:channel>

<int:router input-channel="manualAccountRequests" ref="manualAccountRouter" method="routeManualAccountRequests">
    <int:mapping channel="manualAccountDataRequest"/>
    <int:mapping channel="manualAccountDetailsRequest"/>
    <int:mapping channel="manualAccountPositionsRequest"/>
</int:router>

<int:channel id="manualAccountDataRequest"/>
<int:service-activator input-channel="manualAccountDataRequest" ref="manualAccountDataMapper"
                       method="getManualAccountData" output-channel="aggregatedManualAccounts"/>

<int:channel id="manualAccountDetailsRequest"/>
<int:service-activator input-channel="manualAccountDetailsRequest" ref="manualAccountDetailsMapper"
                       method="getManualAccountDetails" output-channel="aggregatedManualAccounts"/>

<int:channel id="manualAccountPositionsRequest"/>
<int:service-activator input-channel="manualAccountPositionsRequest" ref="manualAccountPositionsMapper"
                       method="getManualAccountPositions" output-channel="aggregatedManualAccounts"/>

<int:channel id="aggregatedManualAccounts"/>
<int:aggregator input-channel="aggregatedManualAccounts"
ref="manualAccountAggregator" method="aggregate" output-channel="aggregatedAccounts"
message-store="accountMessageStore" expire-groups-upon-completion="true"/>

<!-- ************************** END MANUAL ACCOUNTS ***************************** -->

<int:channel id="aggregatedAccounts" />
<int:aggregator input-channel="aggregatedAccounts" ref="accountAggregator"
    method="aggregate" output-channel="allAccounts" message-store="accountMessageStore"
    expire-groups-upon-completion="true" />




<bean id="accountServiceTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="25" />
    <property name="maxPoolSize" value="250" />
    <property name="queueCapacity" value="500" />
</bean>

Upvotes: 0

Views: 537

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

When using the standard correlation header (correlationId), nested sequence details (correlationId, sequenceSize, sequenceNumber) are maintained automatically, by the framework, in a stack which is pushed (in the subsequent splitter) and popped (in the aggregator) under a header sequenceDetails. See MessageBuilder.pushSequenceDetails() and the corresponding popSequenceDetails() just below.

If you are using non-standard correlation, you are responsible for keeping it straight, in a similar fashion, when nesting splitters.

Non-standard correlation is not a problem, as long as you store the data in the correlationId header, if you want the framework to handle nested splits.

As always, DEBUG logging and following messages through the flow will generally identify these kinds of problems.

PS: Always try to indicate the version you are using.

Upvotes: 1

Related Questions