Reputation: 2013
I've been attempting to build a flow that effectively, uses a database as a queue. The reason for this is that other processes are expected to read and reply to this message, and were designed this way. Unfortunately, i have no control over this other process and cannot make it respond to a different queuing system.
So the flow would work like this: HTTP Request to insert a record into a database -> Separate application (outside of mule) polls this database table for messages and responds with another message into a different table(this step may take >5 seconds to respond) -> read this new row and respond to original http request.
In this design, the request-reply scope always times out waiting for the reply to appear. (i manually set it to 20 seconds to show this fairly quickly)
Response timed out (20000ms) waiting for message response id "3e1a7750-ee13-11e6-ae40-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer
i've clearly missed something and can't seem to locate the correct documentation for this from mule. I hope one of the good users of this site may correct the error of my ways.
below is the flow and a sample of the view
<flow name="mainFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="hello" doc:name="HTTP"/>
<cxf:jaxws-service doc:name="CXF" configuration-ref="CXF_Configuration" serviceClass="kansas.MuleTestServiceImpl"/>
<request-reply doc:name="Request-Reply" timeout="20000">
<db:insert config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into tblRequest (id, correlationId, replyTo) values (#[message.id], #[message.correlationId], #[message.replyTo])]]></db:parameterized-query>
</db:insert>
<jms:inbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS">
<jms:transaction action="JOIN_IF_POSSIBLE"/>
</jms:inbound-endpoint>
</request-reply>
<logger message="payload is #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="databasePoller">
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="5000"/>
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select id,correlationId,msgresponse,replyto from tblResponse]]></db:parameterized-query>
</db:select>
</poll>
<foreach collection="#[payload]" doc:name="For Each">
<set-variable variableName="storedPayload" value="#[payload]" doc:name="storePayload"/>
<db:delete config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[delete from tblResponse where correlationId = #[storedPayload.correlationId]]]></db:parameterized-query>
</db:delete>
<set-payload value="#[flowVars.storedPayload]" doc:name="restorePayload"/>
<message-properties-transformer overwrite="true" doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_ID" value="#[payload.ID]"/>
<add-message-property key="MULE_REPLYTO" value="#[payload.REPLYTO]"/>
</message-properties-transformer>
<set-payload value="#[payload.MSGRESPONSE]" doc:name="Set Payload"/>
<jms:outbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"/>
<logger level="INFO" doc:name="Logger"/>
</foreach>
</flow>
EXCEPTION BELOW
Message : Response timed out (20000ms) waiting for message response id "b9a93d10-efa4-11e6-808b-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer Type : org.mule.api.routing.ResponseTimeoutException Code : MULE_ERROR--2 JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/routing/ResponseTimeoutException.html Payload : 1
Root Exception stack trace: org.mule.api.routing.ResponseTimeoutException: Response timed out (20000ms) waiting for message response id "b9a93d10-efa4-11e6-808b-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.receiveAsyncReply(AbstractAsyncRequestReplyRequester.java:283) at org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:89)
Upvotes: 0
Views: 627
Reputation: 2013
I believe i've solved this. I got this working completely with a HTTP request endpoint instead of a database poller. When reviewing the message, it appears that one difference was through my database poller, mule places 2 additional properties. MULE_CORRELATION_SEQUENCE and MULE_CORRELATION_GROUP_SIZE.
By removing these properties just before sending the message to jms, allowed the request-reply scope to correctly identify the response in the jms queue.
<remove-property propertyName="MULE_CORRELATION_SEQUENCE" doc:name="Property"/>
<remove-property propertyName="MULE_CORRELATION_GROUP_SIZE" doc:name="Property"/>
Upvotes: 0
Reputation: 702
I simplified the flow a bit to get it working. It might be a fitting solution if you instead of polling the DB in a separate flow trigger it with a VM queue.
<flow name="mainFlow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="hello" doc:name="HTTP" />
<dw:transform-message doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0 %output application/java
---
{
name: "abc",
euro: 130,
usd: 123
}]]></dw:set-payload>
</dw:transform-message>
<request-reply doc:name="Request-Reply" timeout="20000">
<vm:outbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM">
<message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO"/> </message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</request-reply>
<logger message="#[message.payloadAs(java.lang.String)]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="soFlow">
<vm:inbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"/>
<db:insert config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into item (
item_name,
price_euro,
price_usd)
values (#[payload.name], #[payload.euro], #[payload.usd])]]> </db:parameterized-query>
</db:insert>
<vm:outbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
</flow>
<flow name="databasePoller">
<vm:inbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
<db:select config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select item_name,
price_euro,
price_usd from item]]></db:parameterized-query>
</db:select>
<vm:outbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</flow>
Upvotes: 0