azazel
azazel

Reputation: 41

Multi-threaded JMS message consumption

I need to concurrently consume messages from a queue. The order of the messages is luckily not important.

I have come up with a solution, but I am not sure how correct it is. The solution consumes messages in multiple threads, reuses them, and eventually kills the threads. But isn't consuming messages like this thread-unsafe?

In addition, the current solution uses AUTO_ACKNOWLEDGE, ideally I would like to replace it with CLIENT_ACKNOWLEDGE. Unfortunately, as I am using a single session, all the messages will get acknowledged at once, so it seems there is no easy way of doing it. ActiveMQ Artemis has no option for INDIVIDUAL_ACKNOWLEDGE like ActiveMQ "Classic" did.

Does anyone have any ideas if this solution is okay and/or any ideas on how to improve this so that I can use CLIENT_ACKNOWLEDGE?

private void createConsumer(Connection connection, String queueName, int maxPoolSize) {
    try {
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(queueName);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                0, maxPoolSize,
                60L,
                TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());

        messageConsumer.setMessageListener(message -> {
            try {
                executor.submit( () -> {
                   messageHandler.handleMessage(message);
                });
                message.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

Upvotes: 0

Views: 927

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35152

ActiveMQ Artemis does have an INDIVIDUAL_ACKNOWLEDGE mode for consumers. You just need to use ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE as noted in the documentation.

Also, basically the only JMS object that is really thread-safe is javax.jms.Connection so you can't use sessions, consumers, producers, etc. concurrently. However, in your code you're not actually using any of these concurrently.

That said, you're still just creating a single consumer here. I think you'd get even better performance and perhaps simpler code as well if you just created multiple sessions, consumers, listeners, etc. The client implementation will take care of invoking each of your listeners in their own thread as messages arrive so you won't have to manage any of the threads yourself.

Upvotes: 2

Related Questions