Reputation: 41
I need to concurrently consume messages from a queue. The order of the messages is luckily not important.
I have come up with a solution, but I am not sure how correct it is. The solution consumes messages in multiple threads, reuses them, and eventually kills the threads. But isn't consuming messages like this thread-unsafe?
In addition, the current solution uses AUTO_ACKNOWLEDGE
, ideally I would like to replace it with CLIENT_ACKNOWLEDGE
.
Unfortunately, as I am using a single session, all the messages will get acknowledged at once, so it seems there is no easy way of doing it.
ActiveMQ Artemis has no option for INDIVIDUAL_ACKNOWLEDGE
like ActiveMQ "Classic" did.
Does anyone have any ideas if this solution is okay and/or any ideas on how to improve this so that I can use CLIENT_ACKNOWLEDGE
?
private void createConsumer(Connection connection, String queueName, int maxPoolSize) {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer messageConsumer = session.createConsumer(queue);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
0, maxPoolSize,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
messageConsumer.setMessageListener(message -> {
try {
executor.submit( () -> {
messageHandler.handleMessage(message);
});
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
Upvotes: 0
Views: 927
Reputation: 35152
ActiveMQ Artemis does have an INDIVIDUAL_ACKNOWLEDGE
mode for consumers. You just need to use ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE
as noted in the documentation.
Also, basically the only JMS object that is really thread-safe is javax.jms.Connection
so you can't use sessions, consumers, producers, etc. concurrently. However, in your code you're not actually using any of these concurrently.
That said, you're still just creating a single consumer here. I think you'd get even better performance and perhaps simpler code as well if you just created multiple sessions, consumers, listeners, etc. The client implementation will take care of invoking each of your listeners in their own thread as messages arrive so you won't have to manage any of the threads yourself.
Upvotes: 2