Reputation: 321
This question was asked 4 months ago.
https://stackoverflow.com/posts/16241300/edit
Anyone?
"I have written a quartz code in mule flow to consume all the messages from queue every 5 minutes.
<quartz:inbound-endpoint jobName="abc" cronExpression="0 0/1 * * * ?" doc:name="Quartz">
<quartz:endpoint-polling-job>
<quartz:job-endpoint ref="jmsEndPoint" />
</quartz:endpoint-polling-job>
</quartz:inbound-endpoint>
But this above code consumes only one message at a time even if there are 5 messages in the queue.
My requirement is to run a job every 5 minutes and consumes all the messages from queue.
Another requirements is to filter out duplicate messages using unique identifier from within the message payload.
Any help will be appreciated. "
EDIT: JMS endpoint
<jms:endpoint name="jmsEndPoint" queue="MyQueue" connector-ref="connector"/>
Upvotes: 2
Views: 1674
Reputation: 12225
In your Mule-config xml:
<quartz:connector name="quartzConnector">
<receiver-threading-profile
maxThreadsActive="1" />
</quartz:connector>
<flow name="DelayedMessageProcessing">
<quartz:inbound-endpoint name="qEP6"
cronExpression="${some.cron.expression}"
jobName="DelayedProcessing"
connector-ref="quartzConnector">
<jms:transaction action="ALWAYS_BEGIN" />
<quartz:event-generator-job />
</quartz:inbound-endpoint
<component class="com.something.myComponent" />
</flow>
.. and the Java component:
package com.something;
import org.mule.api.MuleEventContext;
import org.mule.api.MuleException;
import org.mule.api.MuleMessage;
import org.mule.api.lifecycle.Callable;
public class MyComponent implements Callable {
public Object onCall(final MuleEventContext muleEventContext) throws Exception {
MuleMessage delayedMessage = fetchMessage(muleEventContext);
while (delayedMessage != null) {
//You might have to copy properties from inbound to outbound scope here..
muleEventContext.dispatchEvent(delayedMessage, "some.jms.endpoint");
delayedMessage = fetchMessage(muleEventContext);
}
return null;
}
private MuleMessage fetchMessage(final MuleEventContext muleEventContext) throws MuleException {
return muleEventContext.requestEvent("some.delayed.jms.endpoint", 3000);
}
}
Upvotes: 0
Reputation: 7551
Looking at your code it looks like you will need to read it like this:
muleEventContext.requestEvent("MyQueue", -1);
If you want to filter on id you can do this:
<idempotent-message-filter idExpression="#[message:id]-#[header:foo]">
<simple-text-file-store directory="./idempotent"/>
</idempotent-message-filter>
Upvotes: 1
Reputation: 699
Queues are event based and are designed to return just one message (first in first out). In order to consume all messages from form a queue in a Mule flow, one approach is to create a custom component which will programmatically consume a jms message from a queue until there are no more messages.
In order to filter duplicate messages, consider using Mule's idempotent router:
HTH
Upvotes: 2