Reputation: 2109
I have an application with several activeMq queues. I would like to list the messages in them and remove any of them from any of the queues based on the id of the message.
Here is my code so far.
public void killMessage(String id) {
try {
ActiveMQConnection activeMqConnection = (ActiveMQConnection) connectionFactory.createConnection();
activeMqConnection.start();
DestinationSource destinationSource = activeMqConnection.getDestinationSource();
Set<ActiveMQQueue> queues = destinationSource.getQueues();
QueueSession queueSession = activeMqConnection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
for(ActiveMQQueue queue : queues) {
QueueBrowser browser = queueSession.createBrowser(queue);
Enumeration<?> messagesInQueue = browser.getEnumeration();
while (messagesInQueue.hasMoreElements()) {
Message message = (Message) messagesInQueue.nextElement();
System.out.println("Current id: " + message.getJMSMessageID());
if(message.getJMSMessageID().equals(id)){
System.out.println("-----message id found-------");
}
}
}
activeMqConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
I iterate through all the queues, then I iterate through all messages in each queue. I even find the message I want to delete, but I cannot find a way to remove it from the queue.
Edit:
I also created a consumer. I am not sure how the consumer should make the messages disappear from the queue. My attempt at it, which have no effect at all, messages remain in the queue, and I get no error message and no exception is thrown which could indicate the consumer did not match a message:
if(message.getJMSMessageID().equals(id)){
System.out.println("-----message id found-------");
MessageConsumer consumer = queueSession.createConsumer(queue, "JMSMessageID='" + id + "'");
consumer.receive();
consumer.close();
}
Upvotes: 2
Views: 6584
Reputation: 34998
If you want to use the JMS API to do this then you'll have to create a consumer and use a selector to consume the message with the ID that you want. A queue browser cannot consume messages; it can only browse them.
In the code you pasted you're creating a transacted session which means when you consume the message you'll need to commit the session otherwise the message will never be acknowledged. That said, you're probably better off creating a non-transacted session with AUTO_ACKNOWLEDGE
instead.
Also, you probably want to call receive(int)
(i.e. with a timeout) so that if the selector can't find the message for some reason your application doesn't just sit there forever waiting on the method to return.
Upvotes: 1