Reputation: 43
I have a list of objects, which right now I am processing in foreach. The list is nothing but a string of ids that kicks off other stuff internally.
<flow name="flow1" processingStrategy="synchronous">
<quartz:inbound-endpoint jobName="integration" repeatInterval="86400000" responseTimeout="10000" doc:name="Quartz" >
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="RequestFeeder" doc:name="RequestFeeder"/>
<foreach collection="#[payload]" doc:name="For Each">
<flow-ref name="createFlow" doc:name="createFlow"/>
<flow-ref name="queueFlow" doc:name="queueFlow"/>
<flow-ref name="statusCheckFlow" doc:name="statusCheckFlow"/>
<flow-ref name="resultsFlow" doc:name="resultsFlow"/>
<flow-ref name="sftpFlow" doc:name="sftpFlow"/>
<logger message="RequestType #[flowVars['rqstType']] complete" level="INFO" doc:name="Done"/>
</foreach>
<logger message="ALL 15 REQUESTS HAVE BEEN PROCESSED" level="INFO" doc:name="Logger"/>
</flow>
I want to process them in parallel. ie execute the same 4 flow-refs in parallel for all 15 requests coming in the list. This seems simple, but I havent been able to figure it out yet. Any help appreciated.
Upvotes: 1
Views: 3845
Reputation: 21
You say 4 flows, but the list contains 5 flows. If you want all flows executed in sequence, but each item in the collection executed in parallel, you will want a splitter followed by a separate vm flow containing all (4/5) flows, as explained here: https://support.mulesoft.com/s/article/Concurrently-processing-Collection-and-getting-the-results.
If you want the flows inside the loop to execute in parallel then you choose a Scatter-Gather component.
It is important to be clear which of the two things you are wanting to achieve as the solution would be very different. So the basic difference is, in Scatter-Gather a single message is sent to multiple recipients for processing in parallel, but in Splitter-Aggregator a single message is split into multiple sub messages and processed individually and then aggregated. See: http://muthurajud.blogspot.com/2016/07/eai-patterns-scattergather-versus.html
Upvotes: 1
Reputation: 8321
Scatter- gather of Mule component is one of the component to make easy for parallel processing, A simple example will be following :-
<scatter-gather >
<flow-ref name="flow1" />
<flow-ref name="flow2" />
<flow-ref name="flow3" />
</scatter-gather>
So, the flows you want to execute in parallel can be kept inside the
Upvotes: 0
Reputation: 2475
An alternative to the scatter-gather approach is to simply split the collection and use a VM queue for the items in the list. This method can be simpler if you don't need to wait and collect all 15 results, and will still work if you do.
Try something like this. Mule automatically uses a thread pool (more info) to run your flow, so the requestProcessor flow below will process your requests in parallel.
<flow name="scheduleRequests">
<quartz:inbound-endpoint jobName="integration" repeatInterval="86400000" responseTimeout="10000" doc:name="Quartz" >
<quartz:event-generator-job/>
</quartz:inbound-endpoint>
<component class="RequestFeeder" doc:name="RequestFeeder"/>
<collection-splitter />
<vm:outbound-endpoint path="requests" />
</flow>
<flow name="requestProcessor">
<vm:inbound-endpoint path="requests" />
<flow-ref name="createFlow" doc:name="createFlow"/>
<flow-ref name="queueFlow" doc:name="queueFlow"/>
<flow-ref name="statusCheckFlow" doc:name="statusCheckFlow"/>
<flow-ref name="resultsFlow" doc:name="resultsFlow"/>
<flow-ref name="sftpFlow" doc:name="sftpFlow"/>
</flow>
Upvotes: 3
Reputation: 712
I reckon you still want those four flows to run sequentially, right? If that were not the case you could always change the threading profile.
Another thing you could do is to wrap the four flows in an async scope although you may need a processor change.
In any event I think you'll be better of using the scatter gather component:
Which without needing the for each scope will split the list and execute each item in a different thread. You could define how many threads you want to run in parallel (so you don't just spin of a new thread you use a pool).
One final note though, is meant to aggregate the result of all the processed items. I reckon you could change that with a custom aggregation strategy but not sure really, please take a look at the docs for that.
HTH
Upvotes: 1