Reputation: 228
In our architecture we've 2 or more containers for each application for high availiability puproses. We're using activeMQ and I would like to implement the following behavior.
I want only one consumer to process each message based on availability. Our implementation is in Java.
Please share the way to implement it.
Here is my code sample
final Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
// Create a session.
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Destination consumerDestination = consumerSession.createQueue(queueName);
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
// Begin to wait for messages.
Queue queue = consumerSession.createQueue(queueName);
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
//do your things here
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
if (message == null)
continue;
//handle message
System.out.println("Message received in : " + message);
try {
String text = message.getText();
JSONObject messageJson = new JSONObject(text);
consumer.receive(1000);
String responseString = handleMessage(messageJson);
message.acknowledge();
Thank you Moshe
Upvotes: 1
Views: 590
Reputation: 34973
The problem here is that you're acknowledging the message from the QueueBrowser
instance which will have no impact since browsers are only for browsing messages and not consuming them.
...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
...
ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
...
try {
...
message.acknowledge(); // this does nothing
You are actually creating a real consuming and calling consumer.receive(1000)
, but you're discarding the Message
instance the receive()
returns. This is the Message
instance which you must acknowledge to actually consume the message from the queue.
...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
...
try {
...
Message actualMessage = consumer.receive(1000); // acknowledge this!
...
Another important note...The messages which a queue browser receives are not guaranteed to be a static snapshot of the queue. Therefore it's dangerous to assume that your call to consumer.receive(1000)
is actually the same message which came from the queue browser. In my opinion you should rework this logic to work only with a MessageConsumer
and drop the QueueBrowser
.
Upvotes: 1