RAVI P
RAVI P

Reputation: 77

Parallel Processing in Mule : Issue with getting right response

Requirement is to develop mule flow which calls 3 different sync services in parallel and then aggregate the response of each of these and send it back to caller.

I have followed fork join approach as mentioned in docs and How to make parallel outbound calls. My config file looks like below :

            <flow name="fork">
            <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response">
            <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                value="2" />
            <all enableCorrelation="IF_NOT_SET">
                <async>
                    <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                        value="1" />
                    <flow-ref name="parallel1" />
                </async>
                <async>
                    <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                        value="2" />
                    <flow-ref name="parallel2" />
                </async>
            </all>
        </flow>

        <sub-flow name="parallel1">
            <logger level="INFO" message="parallel1: processing started" />
            <!- Transformation payload -->
            <http:outbound-endpoint address="..."
                exchange-pattern="request-response" />
            <logger level="INFO" message="parallel1: processing finished" />
            <flow-ref name="join" />
        </sub-flow>

        <sub-flow name="parallel2">
            <logger level="INFO" message="parallel2: processing started" />
            <!- Transformation payload -->
            <http:outbound-endpoint address="..."
                exchange-pattern="request-response" />
            <logger level="INFO" message="parallel2: processing finished" />
            <flow-ref name="join" />
        </sub-flow>

        <sub-flow name="join">
            <collection-aggregator timeout="6000"
                failOnTimeout="true" />
            <combine-collections-transformer />
            <logger level="INFO"  message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
            <set-payload value="Soap XML Response"/>
        </sub-flow>

I am able to verify that till "join" subflow is working fine but the response is not coming back as "Soap XML Response". The response is the same initial SOAP Request.

How can I make this thread wait till sub-flow processing is complete and it sends back response whatever "join" sub-flow returns ??

Upvotes: 3

Views: 2127

Answers (1)

user1760178
user1760178

Reputation: 6707

The fork join in the above post looks good. The issue here is there is no way to capture the payload after the join and bring it back to the main flow.

As the calls to parallel made async the main flow continues without waiting for the join output.

I have modified the flow to address this issue. Now the flow will have a processor to wait for the reply and read the joined output to be written onto the http transformer.

    <flow name="fork">
        <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response">
            <!-- To get back the response after the fork-join -->
        <request-reply timeout="60000">
            <jms:outbound-endpoint queue="parallel.processor.queue">
                <message-properties-transformer scope="outbound">
                    <delete-message-property key="MULE_REPLYTO" />
                </message-properties-transformer>
            </jms:outbound-endpoint>
            <jms:inbound-endpoint queue="join.queue" >      
            </jms:inbound-endpoint>
        </request-reply>            
    </flow>

    <flow name="fork_join_flow" >
        <jms:inbound-endpoint queue="parallel.processor.queue" exchange-pattern="one-way" />        
        <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                value="2" />
        <all enableCorrelation="IF_NOT_SET">
            <async>
                <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                    value="1" />
                <flow-ref name="parallel1" />
            </async>
            <async>
                <set-property propertyName="MULE_CORRELATION_SEQUENCE"
                    value="2" />
                <flow-ref name="parallel2" />
            </async>
        </all>
    </flow>

    <sub-flow name="parallel1">
        <logger level="INFO" message="parallel1: processing started" />
        <!- Transformation payload -->
        <http:outbound-endpoint address="..."
            exchange-pattern="request-response" />
        <logger level="INFO" message="parallel1: processing finished" />
        <flow-ref name="join" />
    </sub-flow>

    <sub-flow name="parallel2">
        <logger level="INFO" message="parallel2: processing started" />
        <!- Transformation payload -->
        <http:outbound-endpoint address="..."
            exchange-pattern="request-response" />
        <logger level="INFO" message="parallel2: processing finished" />
        <flow-ref name="join" />
    </sub-flow>

    <sub-flow name="join">
        <collection-aggregator timeout="6000"
            failOnTimeout="true" />
        <combine-collections-transformer />
        <logger level="INFO"  message="Continuing processing of: #[message.payloadAs(java.lang.String)]" />
        <set-payload value="Soap XML Response"/>
        <jms:outbound-endpoint queue="join.queue">              
        </jms:outbound-endpoint>
    </sub-flow>

Hope this helps.

Upvotes: 5

Related Questions