Moshe
Moshe

Reputation: 228

Using ActiveMQ with more than one consumer instance

In our architecture we've 2 or more containers for each application for high availiability puproses. We're using activeMQ and I would like to implement the following behavior.

  1. Pushing to a message to queue.
  2. This message will be consumed only by *one *container (the first one that will read it).
  3. Once message processed successfully the consumer will update this message can be acknowledged and neglected
  4. I tried using transaction with commit and using CLIENT_ACKNOWLEDGE however in both cases both consumers got the message and processed it.

I want only one consumer to process each message based on availability. Our implementation is in Java.

Please share the way to implement it.

Here is my code sample

 final Connection consumerConnection = connectionFactory.createConnection();
    consumerConnection.start();
    // Create a session.
    final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    final Destination consumerDestination = consumerSession.createQueue(queueName);


    // Create a message consumer from the session to the queue.
    final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
    // Begin to wait for messages.
    Queue queue = consumerSession.createQueue(queueName);
    QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
    Enumeration msgs = queueBrowser.getEnumeration();
    while (msgs.hasMoreElements()) {
        //do your things here

        ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();

        if (message == null)
            continue;

        //handle message
        System.out.println("Message received in : " + message);
        try {
            String text = message.getText();
            JSONObject messageJson = new JSONObject(text);
            consumer.receive(1000);

            String responseString = handleMessage(messageJson);

            message.acknowledge();

Thank you Moshe

Upvotes: 1

Views: 590

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 34973

The problem here is that you're acknowledging the message from the QueueBrowser instance which will have no impact since browsers are only for browsing messages and not consuming them.

...
QueueBrowser queueBrowser = consumerSession.createBrowser(queue);
Enumeration msgs = queueBrowser.getEnumeration();
while (msgs.hasMoreElements()) {
    ...
    ActiveMQTextMessage message = (ActiveMQTextMessage) msgs.nextElement();
    ...
    try {
        ...
        message.acknowledge(); // this does nothing

You are actually creating a real consuming and calling consumer.receive(1000), but you're discarding the Message instance the receive() returns. This is the Message instance which you must acknowledge to actually consume the message from the queue.

...
// Create a message consumer from the session to the queue.
final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
...
while (msgs.hasMoreElements()) {
    ...
    try {
        ...
        Message actualMessage = consumer.receive(1000); // acknowledge this!
        ...

Another important note...The messages which a queue browser receives are not guaranteed to be a static snapshot of the queue. Therefore it's dangerous to assume that your call to consumer.receive(1000) is actually the same message which came from the queue browser. In my opinion you should rework this logic to work only with a MessageConsumer and drop the QueueBrowser.

Upvotes: 1

Related Questions