Reputation: 13
Using request/reply router with the collection splitter/aggregator, I successfully split an array of messages to its worker asynchronously, then use the aggregator to consolidate the result back together very nicely.
Now I want to do a looping (synchronously) on top of the above, so I use Foreach MP or another set of split-aggregate on top of the existing one (yes, I did save those properties as the invocation property area and restore them back).
I can see it finished with the aggregator for the first iteration, but the VM inbound-end point in the request/reply router never get anything back so the got stuck. I tried many things, but nothing helps. Any idea why?
I have two String array sa: {11, 12, 13} and sb: {21, 22, 23} under an ArrayList AL. I want loop over AL synchronouly, for each String array, I want to do split-aggregate asynchronously.
Any help is very much appreciated.
Suli
David, Thank you.
I put logger right after the request/reply router, the flow doesn't hit it. I also has an logger right after the collection aggregator and it does hit it.
Here is the XML Config -----------
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd ">
<queued-asynchronous-processing-strategy name="all2thread" maxThreads="2" doc:name="Queued Asynchronous Processing Strategy"/>
<flow name="splitertest2Flow1" doc:name="splitertest2Flow1">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8581" doc:name="HTTP"/>
<expression-filter expression="#[groovy:!payload.contains('.ico')]" doc:name="Expression"/>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[String [] sa = new String[3];
sa[0]="message ... 11";
sa[1]="message ... 12";
sa[2]="message ... 13";
String [] sb = new String[3];
sb[0]="message ... 21";
sb[1]="message ... 22";
sb[2]="message ... 23";
ArrayList al = new ArrayList();
al.add(sa);
al.add(sb);
message.setPayload(al);
return message;]]></scripting:text>
</scripting:script>
</scripting:transformer>
<foreach doc:name="Foreach">
<request-reply storePrefix="workStore">
<vm:outbound-endpoint path="work.IN">
<message-properties-transformer scope="outbound">
<delete-message-property key="MULE_REPLYTO"/>
</message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint path="work.OUT"></vm:inbound-endpoint>
</request-reply>
<logger message="******** Almost there....." level="INFO" doc:name="Logger"/>
</foreach>
<logger message="************** Very Happy to get here **********************" level="INFO" doc:name="Logger"/>
</flow>
<flow name="splitertest2Flow2" doc:name="splitertest2Flow2">
<vm:inbound-endpoint exchange-pattern="one-way" path="work.IN" doc:name="VM"/>
<collection-splitter doc:name="Collection Splitter"/>
<flow-ref name="DoWork2" doc:name="DoWork2"/>
</flow>
<flow name="DoWork2" doc:name="DoWork2" processingStrategy="all2thread">
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[String msg = message.getPayload();
println "processing..."+msg;
Thread.sleep(1500);
println "exit..."+msg;
return message;]]></scripting:text>
</scripting:script>
</scripting:transformer>
<vm:outbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
</flow>
<flow name="splitertest2Flow3" doc:name="splitertest2Flow3" processingStrategy="all2thread">
<vm:inbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
<collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
<logger message="************ after aggregator ************" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="work.OUT" doc:name="VM"/>
</flow>
</mule>
Upvotes: 1
Views: 2790
Reputation: 33413
There are two issues at play that prevent this to work:
foreach
element uses the same correlation ID for all the messages it creates which completely messes the collection-aggregator
downstream. The aggregator works by grouping on this ID and since it's the same for the six messages, it can't work. To fix this I had to assign a new correlation ID as a first step after foreach
.request-reply
computes an "async reply correlation ID" that must be used when dispatching to the reply queue (work.OUT
). Usually this "async reply correlation ID" is equal to the message correlation ID, but not in this case (I suspect because we are behind a foreach
). To fix this I had to store the asyncReplyCorrelationId
in a session variable and re-establish it as the correlation ID right before dispatching to the reply queue.Here is the complete working config:
<queued-asynchronous-processing-strategy
name="all2thread" maxThreads="2" />
<flow name="splitertest2Flow1">
<http:inbound-endpoint exchange-pattern="request-response"
host="localhost" port="8581" />
<expression-filter expression="#[groovy:!payload.contains('.ico')]" />
<scripting:transformer>
<scripting:script engine="Groovy">
<scripting:text><![CDATA[
String [] sa = new String[3];
sa[0]="message ... 11";
sa[1]="message ... 12";
sa[2]="message ... 13";
String [] sb = new String[3];
sb[0]="message ... 21";
sb[1]="message ... 22";
sb[2]="message ... 23";
ArrayList al = new ArrayList();
al.add(sa);
al.add(sb);
message.setPayload(al);
return message;
]]></scripting:text>
</scripting:script>
</scripting:transformer>
<foreach>
<scripting:transformer>
<scripting:script engine="Groovy">
<scripting:text><![CDATA[
message.correlationId = UUID.randomUUID().toString()
return message
]]></scripting:text>
</scripting:script>
</scripting:transformer>
<request-reply storePrefix="workStore">
<vm:outbound-endpoint path="work.IN">
<message-properties-transformer
scope="outbound">
<delete-message-property key="MULE_REPLYTO" />
</message-properties-transformer>
<message-properties-transformer
scope="session">
<add-message-property key="asyncReplyCorrelationId"
value="#[message.correlationId + message.correlationSequence]" />
</message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint path="work.OUT" />
</request-reply>
<logger message="******** Almost there....." level="INFO" />
</foreach>
<logger message="************** Very Happy to get here **********************"
level="INFO" />
</flow>
<flow name="splitertest2Flow2">
<vm:inbound-endpoint exchange-pattern="one-way"
path="work.IN" />
<collection-splitter />
<flow-ref name="DoWork2" />
</flow>
<flow name="DoWork2">
<scripting:transformer>
<scripting:script engine="Groovy">
<scripting:text><![CDATA[
String msg = message.getPayload();
println "processing..."+msg;
Thread.sleep(1500);
println "exit..."+msg;
return message;
]]></scripting:text>
</scripting:script>
</scripting:transformer>
<vm:outbound-endpoint exchange-pattern="one-way"
path="work.Q" />
</flow>
<flow name="splitertest2Flow3" processingStrategy="all2thread">
<vm:inbound-endpoint exchange-pattern="one-way"
path="work.Q" />
<collection-aggregator failOnTimeout="true" />
<logger message="************ after aggregator ************"
level="INFO" />
<scripting:transformer>
<scripting:script engine="Groovy">
<scripting:text><![CDATA[
message.correlationId = asyncReplyCorrelationId
return message
]]></scripting:text>
</scripting:script>
</scripting:transformer>
<vm:outbound-endpoint exchange-pattern="one-way"
path="work.OUT" />
</flow>
Upvotes: 2