Reputation: 449
I am trying to configure the following using Spring Integration:
I have a few problems with this so far...
I am using a publish-subscribe-channel in order to set the apply-sequence="true"
property so that the correlationId, sequenceSize & sequenceNumber properties are set. These properties are being thrown away by the DefaultAmqpHeaderMapper
. DEBUG headerName=[correlationId] WILL NOT be mapped
The sequenceSize property is only being set to 1, even though there are 2 queues registered within the fanout exchange. Presumably this would mean that the messages would be released from the aggregator too early. I expect this is becuase I am misusing the publish-subscribe-channel in order to use apply-sequence="true"
and it is quite rightly saying there is only one subscriber, the int-amqp:outbound-gateway
.
My outbound Spring config is as follows:
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
My rabbitMQ config is as follows:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
A consumer looks like this:
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
Any suggestions would be great, I suspect I have got the wrong end of the stick somewhere...
New outbound spring config based on Gary's comments:
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
Upvotes: 4
Views: 8412
Reputation: 2058
Even though this question is 3 years old, I'm going to respond to it because I had the same question.
Spring Integration has an implementation of Scatter-Gather that sounds very much like your original question.
Here is the relevant section from the Spring Documentation
It is a compound endpoint, where the goal is to send a message to the recipients and aggregate the results....
Previously, the pattern could be configured using discrete components, this enhancement brings more convenient configuration.
The ScatterGatherHandler is a request-reply endpoint that combines a PublishSubscribeChannel (or RecipientListRouter) and an AggregatingMessageHandler. The request message is sent to the scatter channel and the ScatterGatherHandler waits for the reply from the aggregator to sends to the outputChannel.
Upvotes: -1
Reputation: 174799
The problem is that S.I. doesn't know about the topology of the fanout exchange.
The simplest way around this is to use a custom release strategy
release-strategy-expression="size() == 2"
on the aggregator (assuming a fanout of 2). So, you don't need the sequence size; you can avoid "abusing" the pub/sub channel with a header-enricher...
<int:header-enricher input-channel="foo" output-channel="bar">
<int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
</int:header-enricher>
You can avoid creating a new UUID by using the message id, which is already unique...
<int:correlation-id expression="headers['id']" />
Finally, you can pass the correlationId header to AMQP by adding
mapped-request-headers="correlationId"
to your amqp endpoints.
Upvotes: 3