Cat
Cat

Reputation: 62

ActiveMQ Artemis - Increasing Message Count on Management Queues

I am using the ActiveMQ Artemis management API to consume information about which queues are in use and how many consumers there are.

Since I am caching the ClientRequestor the message count on the "activemq.management.*" queue increases on and on. Although I defined expiry delay the messages on the queue are not discarded.

Can someone explain why?

I would expect, that the messages are consumed and afterwards gone.

private ServerLocator locator;
private ClientSessionFactory defaultFactory;
private ClientSession session;
private ClientRequestor requestor;

public ManagementHelper(String defaultURL) {
    this.locator = ActiveMQClient.createServerLocator(defaultURL);
    this.defaultFactory = locator.createSessionFactory();
    this.session = factory.createSession(this.username, this.password, false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
    this.requestor = new ClientRequestor(session, "activemq.management");
}


@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
    ClientMessage message = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
    session.start();

    ClientMessage replyConsumer = requestor.request(message);
    String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);

    ClientMessage message2 = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);

    ClientMessage replyQueueNames = requestor.request(message2);
    Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}

Upvotes: 1

Views: 478

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 34988

The messages are not removed because your application is not acknowledging them. You must invoke acknowledge() on the reply messages just like any other message you might receive from a queue using the core API, e.g.:

@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
    ClientMessage message = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
    session.start();

    ClientMessage replyConsumer = requestor.request(message);
    replyConsumer.acknowledge();
    String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);

    ClientMessage message2 = session.createMessage(false);
    ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);

    ClientMessage replyQueueNames = requestor.request(message2);
    replyQueueNames.acknowledge();
    Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}

You don't need to invoke commit() on the ClientSession since you're creating it with autoCommitAcks as true.

Upvotes: 2

Related Questions