Andre Coetzee
Andre Coetzee

Reputation: 1310

How to aggregate inbound JMS messages in mule

I am receiving two JMS messages from activeMQ

<flow name="clientoneFlow1" doc:name="clientoneFlow1">
    <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="8081" path="client1" doc:name="HTTP"/>
    <component class="SalesOrder" doc:name="Java"/>
    <json:object-to-json-transformer doc:name="Object to JSON"/>
    <jms:outbound-endpoint queue="ReadOrder1" connector-ref="Active_MQ" doc:name="JMS">
        <jms:object-to-jmsmessage-transformer doc:name="Object to JMSMessage"/>
    </jms:outbound-endpoint>
</flow>

<flow name="clienttwoFlow1" doc:name="clienttwoFlow1">
    <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="8081" path="client2" doc:name="HTTP"/>
    <component class="SalesOrder2" doc:name="Java"/>
    <json:object-to-json-transformer doc:name="Object to JSON"/>
    <jms:outbound-endpoint queue="ReadOrder1" connector-ref="Active_MQ" doc:name="JMS">
        <jms:object-to-jmsmessage-transformer doc:name="Object to JMSMessage"/>
    </jms:outbound-endpoint>
</flow>

    <flow name="integration-flow" doc:name="integration-Flow1" processingStrategy="synchronous">
    <jms:inbound-endpoint queue="ReadOrder1" connector-ref="Active_MQ" doc:name="JMS"/>
    <vm:outbound-endpoint exchange-pattern="request-response" path="vm" doc:name="VM"/>
    <logger message="ending paylaod = #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="integration-flow2" doc:name="integration-Flow2">
    <vm:inbound-endpoint exchange-pattern="request-response" path="vm" doc:name="VM"/>
    <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
</flow>

How can I aggregate the incoming messages? I am using a collection-aggregator and i keep getting the message = Correlation Group Size not set, but correlation aggregator is being used. Message is being forwarded as is

The Message is a JSON [{"salesOrderId":"00001-2-3","saleName":"Car Sale","status":"processing"}]

Upvotes: 0

Views: 942

Answers (1)

Anirban Sen Chowdhary
Anirban Sen Chowdhary

Reputation: 8321

I see some issues in the flow .. In both the flows clientoneFlow1 and clienttwoFlow1 you are using http inbound endpoint and hitting the url separately to start the flows dispatch the message to queue="ReadOrder1" ..

But here as soon as the message is delivered to the queue="ReadOrder1" by one of the flow , integration-flow get started and the message to again dispatched to integration-flow2 where it is received by the collection-aggregator and it doesn't wait for the other flows to get the message (since you need to hit the url of other flow to start it)...

So.. here you could make both the flows clientoneFlow1 and clienttwoFlow1 to execute almost in parallel and dispatch the message to the collection-aggregator almost at a same time ..

So.. one way you can achieve it by using scatter and gather component and you don't required a collection-aggregator there ..

All the message from all the flows will be automatically aggregated at the end of the flow ..

for example ... here you can do the following :-

  <flow name="fork" doc:name="fork">
   <http:inbound-endpoint host="localhost" port="8090" path="scattergather" exchange-pattern="request-response" doc:name="HTTP"/>

  <scatter-gather timeout="6000">
   <!-- Calling clientoneFlow1-->
   <flow-ref name="clientoneFlow1" doc:name="Flow Reference"/>

   <!-- Calling clienttwoFlow1-->
   <flow-ref name="clienttwoFlow1" doc:name="Flow Reference"/>
   </scatter-gather>

   <logger level="INFO"  message="Combined Payload: #[message.payload]" doc:name="Logger"/>
   <logger level="INFO"  message="Payload1 of clientoneFlow1 : #[message.payload[0]] and clienttwoFlow1: #[message.payload[1]] " doc:name="Logger"/>
 </flow>

 <flow name="clientoneFlow1" doc:name="clientoneFlow1">
  <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="8081" path="client1" doc:name="HTTP"/>
  <component class="SalesOrder" doc:name="Java"/>
 <json:object-to-json-transformer doc:name="Object to JSON"/>

 </flow>

 <flow name="clienttwoFlow1" doc:name="clienttwoFlow1">
  <http:inbound-endpoint exchange-pattern="one-way" host="localhost" port="8081" path="client2" doc:name="HTTP"/>
 <component class="SalesOrder2" doc:name="Java"/>
 <json:object-to-json-transformer doc:name="Object to JSON"/>
 </flow>

Now this is the easiest way to achieve combined payload from both the flows without making it so complex ..

Upvotes: 1

Related Questions