Chris Prior
Chris Prior

Reputation: 449

Configuring a Spring Integration aggregator to combine responses from a RabbitMq fanout exchange

I am trying to configure the following using Spring Integration:

  1. Send a message to a channel.
  2. Have this message published to a rabbit fanout (pub/sub) exchange with n consumers.
  3. Each consumer provide a response message.
  4. Have Spring Integration aggregate these responses before returning them to the original client.

I have a few problems with this so far...

  1. I am using a publish-subscribe-channel in order to set the apply-sequence="true" property so that the correlationId, sequenceSize & sequenceNumber properties are set. These properties are being thrown away by the DefaultAmqpHeaderMapper. DEBUG headerName=[correlationId] WILL NOT be mapped

  2. The sequenceSize property is only being set to 1, even though there are 2 queues registered within the fanout exchange. Presumably this would mean that the messages would be released from the aggregator too early. I expect this is becuase I am misusing the publish-subscribe-channel in order to use apply-sequence="true" and it is quite rightly saying there is only one subscriber, the int-amqp:outbound-gateway.

My outbound Spring config is as follows:

<int:publish-subscribe-channel id="output" apply-sequence="true"/>

<int:channel id="reply">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:aggregator input-channel="reply" method="combine">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

<int:logging-channel-adapter id="logger" level="INFO"/>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"/>

My rabbitMQ config is as follows:

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>

<rabbit:fanout-exchange name="fanout-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="a-queue" />
        <rabbit:binding queue="b-queue" />
    </rabbit:bindings>
</rabbit:fanout-exchange>

A consumer looks like this:

<int:channel id="input"/>

<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>

<bean id="listenerService" class="example.ListenerService"/>

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

Any suggestions would be great, I suspect I have got the wrong end of the stick somewhere...

New outbound spring config based on Gary's comments:

<int:channel id="output"/>

<int:header-enricher input-channel="output" output-channel="output">
    <int:correlation-id expression="headers['id']" />
</int:header-enricher>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"
                                   mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>

<int:channel id="reply"/>

<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

Upvotes: 4

Views: 8412

Answers (2)

rince
rince

Reputation: 2058

Even though this question is 3 years old, I'm going to respond to it because I had the same question.

Spring Integration has an implementation of Scatter-Gather that sounds very much like your original question.

Here is the relevant section from the Spring Documentation

It is a compound endpoint, where the goal is to send a message to the recipients and aggregate the results....

Previously, the pattern could be configured using discrete components, this enhancement brings more convenient configuration.

The ScatterGatherHandler is a request-reply endpoint that combines a PublishSubscribeChannel (or RecipientListRouter) and an AggregatingMessageHandler. The request message is sent to the scatter channel and the ScatterGatherHandler waits for the reply from the aggregator to sends to the outputChannel.

Upvotes: -1

Gary Russell
Gary Russell

Reputation: 174799

The problem is that S.I. doesn't know about the topology of the fanout exchange.

The simplest way around this is to use a custom release strategy

release-strategy-expression="size() == 2"

on the aggregator (assuming a fanout of 2). So, you don't need the sequence size; you can avoid "abusing" the pub/sub channel with a header-enricher...

    <int:header-enricher input-channel="foo" output-channel="bar">
        <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
    </int:header-enricher>

You can avoid creating a new UUID by using the message id, which is already unique...

<int:correlation-id expression="headers['id']" />

Finally, you can pass the correlationId header to AMQP by adding

mapped-request-headers="correlationId"

to your amqp endpoints.

Upvotes: 3

Related Questions