RV.
RV.

Reputation: 3018

How to aggregate response from multiple channels

I've a spring-integration implementation with following:

  1. Multiple publishing channels publishing on one common channel.
  2. All the channels returns same response object.
  3. Aggregator trying to aggregate response from all the above channels

Issue: Aggregator not able to combine all the responses together and provided method gets invoked on the first reponse from the channels

Here are the details. What is that I've to do to aggregate the responses?

<int:publish-subscribe-channel id="aggregate-channel" apply-sequence="true"/>
<int:publish-subscribe-channel id="input-channel" apply-sequence="true"/>

<int:service-activator input-channel="input-channel" output-channel="aggregate-channel" ref="...A" method="...A">
<int:service-activator input-channel="input-channel" output-channel="aggregate-channel" ref="...B" method="...B">
<int:service-activator input-channel="input-channel" output-channel="aggregate-channel" ref="...C" method="...C">
<int:service-activator input-channel="input-channel" output-channel="aggregate-channel" ref="...D" method="...D">

<!--This is the aggregator. 
**Expecting a list of size 4 but then it gets list of size 1 for each response channel
-->
<int:aggregator input-channel="aggregate-channel" output-channel="gateway-response-channel" ref="Service" method="responseListProcessor"/>

Upvotes: 1

Views: 609

Answers (2)

Artem Bilan
Artem Bilan

Reputation: 121560

I would say your requirement is fully covered by specific for this kinda of tasks EIP - Scatter-Gather: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#scatter-gather

Upvotes: 0

Solution

  • Replace
<int:publish-subscribe-channel id="aggregate-channel" apply-sequence="true"/>
    with
<int:publish-subscribe-channel id="aggregate-channel"/>

Issue

  • Initially it starts with sequence size 4 because input-channel has 4 subscribers
  • But when you add the attribute apply-sequence="true" to aggregate-channel, it reset the sequence size to 1 because aggregate-channel has only one subscriber which is the aggregator.

Reference

https://docs.spring.io/spring-integration/docs/5.1.7.RELEASE/reference/html/#channel-configuration-pubsubchannel

If you provide a aggregator downstream from a PublishSubscribeChannel, you can set the 'apply-sequence' property on the channel to true.

Doing so indicates that the channel should set the sequence-size and sequence-number message headers as well as the correlation ID prior to passing along the messages.

For example, if there are five subscribers, the sequence-size would be set to 5, and the messages would have sequence-number header values ranging from 1 to 5.

Upvotes: 1

Related Questions