Tobb
Tobb

Reputation: 12205

MessageConsumer.receive() doesn't remove message

I am trying to write a test for a class used to send a JMS-message to ActiveMQ. What I am trying to accomplish is to get a method in the class under test to send the message to an ActiveMQ instance in localhost, and then pick the message up in the test and verify that it is correct.

I have chosen this as my broker url: vm://localhost?broker.persistent=true, which means that a local ActiveMQ instance will be created, and the messages stored in a KahaDB (which is also created.) (I tried using broker.persistent=false, but since the method under test has a finally-clause that closes the connection, the in-memory messages are then lost before I can retrieve them.)

In order to retrieve the message and verify it, I have the following code:

    //call method under test to send a message
    //create a ConnectionFactory with url vm://localhost?broker.persistent=true
    final Connection connection = connectionFactory.createConnection();
    connection.start();

    final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    final Destination dest = session.createQueue("my.queue");

    final MessageConsumer messageConsumer = session.createConsumer(dest);
    Message message = messageConsumer.receive(1000);

    messageConsumer.close();
    session.close();
    connection.close();

My problem is that upon running this code, the messages are not being removed from KahaDb! Upon multiple test runs, the message added the first time will be read again and again. Am I missing something here, or is this a bug in KahaDB/ActiveMQ? I am using ActiveMQ 5.7.0.

Upvotes: 1

Views: 5330

Answers (2)

Beryllium
Beryllium

Reputation: 12998

Try

final Session session = 
  connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

otherwise you get a "transacted" session.


Otherwise, if you really want to have a "transacted" session, you have to call

// 2nd parameter is ignored, if the session is transacted
final Session session = 
  connection.createSession(true, -1);

// Read messages

session.commit();
messageConsumer.close();
session.close();
connection.close();

in order to remove all messages you have read during this session.


For your reference, there is an excellent overview from Javaworld regarding Transactions and redelivery in JMS. It covers additional possibilities as well (using Session.CLIENT_ACKNOWLEDGE to acknowledge messages individually, for example).

Upvotes: 2

Tim Bish
Tim Bish

Reputation: 18366

You've created a transacted session but never called commit. In this case when the close method is called the in-flight transaction is rolled back and so the message that you received is placed back into the queue and will be redelivered to another consumer. You can test this by querying the redelivered count on the message and see that it increases each time. To consume the message call session.commit() before closing the session.

Upvotes: 2

Related Questions