Reputation: 62
I am using the ActiveMQ Artemis management API to consume information about which queues are in use and how many consumers there are.
Since I am caching the ClientRequestor
the message count on the "activemq.management.*" queue increases on and on. Although I defined expiry delay the messages on the queue are not discarded.
Can someone explain why?
I would expect, that the messages are consumed and afterwards gone.
private ServerLocator locator;
private ClientSessionFactory defaultFactory;
private ClientSession session;
private ClientRequestor requestor;
public ManagementHelper(String defaultURL) {
this.locator = ActiveMQClient.createServerLocator(defaultURL);
this.defaultFactory = locator.createSessionFactory();
this.session = factory.createSession(this.username, this.password, false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
this.requestor = new ClientRequestor(session, "activemq.management");
}
@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
ClientMessage message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
session.start();
ClientMessage replyConsumer = requestor.request(message);
String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);
ClientMessage message2 = session.createMessage(false);
ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);
ClientMessage replyQueueNames = requestor.request(message2);
Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}
Upvotes: 1
Views: 478
Reputation: 34988
The messages are not removed because your application is not acknowledging them. You must invoke acknowledge()
on the reply messages just like any other message you might receive from a queue using the core API, e.g.:
@Scheduled(fixedRateString = "20000")
public void getConsumerNbos(ClientSessionFactory factory, ServerLocator locator) {
ClientMessage message = session.createMessage(false);
ManagementHelper.putOperationInvocation(message, ResourceNames.BROKER, listAllConsumersAsJSON);
session.start();
ClientMessage replyConsumer = requestor.request(message);
replyConsumer.acknowledge();
String resultJSON = (String) ManagementHelper.getResult(replyConsumer, String.class);
ClientMessage message2 = session.createMessage(false);
ManagementHelper.putOperationInvocation(message2, ResourceNames.BROKER, MANAGEMENT_OPERATION_QUEUES);
ClientMessage replyQueueNames = requestor.request(message2);
replyQueueNames.acknowledge();
Object[] objQueueNames = (Object[]) ManagementHelper.getResult(replyQueueNames);
}
You don't need to invoke commit()
on the ClientSession
since you're creating it with autoCommitAcks as true
.
Upvotes: 2