Wojciech Piotrowiak
Wojciech Piotrowiak

Reputation: 129

Messages are not consumed when connection use JNDI or ActiveMQConnectionFactory to connect to EmbeddedActiveMQ

This is follow-up to this question.

My code can initiate connection, session etc., however messages are not consumed. I don't see any exceptions in logs.

This test reproduces the problem:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;

import java.io.File;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory;
import org.junit.After;
import org.junit.Before;

public class Test {

    EmbeddedActiveMQ jmsServer;

    final String QUEUE_NAME = "myQueue";

    @Before
    public void setUp() throws Exception {
        final String baseDir = File.separator + "tmp";
        final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        final Configuration config = new ConfigurationImpl();
        config.setPersistenceEnabled(true);
        config.setBindingsDirectory(baseDir + File.separator + "bindings");
        config.setJournalDirectory(baseDir + File.separator + "journal");
        config.setPagingDirectory(baseDir + File.separator + "paging");
        config.setLargeMessagesDirectory(baseDir + File.separator + "largemessages");
        config.setSecurityEnabled(false);

        AddressSettings adr = new AddressSettings();
        adr.setDeadLetterAddress(new SimpleString("DLQ"));
        adr.setExpiryAddress(new SimpleString("ExpiryQueue"));
        config.addAddressSetting("#", adr);

        config.addAcceptorConfiguration("invmConnectionFactory", "vm://0");
        embeddedActiveMQ.setConfiguration(config);
        this.jmsServer = embeddedActiveMQ;
        this.jmsServer.start();

        System.out.println("creating queue");
        final boolean isSuccess = jmsServer.getActiveMQServer().createQueue(new QueueConfiguration(QUEUE_NAME)) != null;
        if(isSuccess) {
            System.out.println(QUEUE_NAME + "queue created");
        }

    }

    @After
    public void tearDown() {
        try {
            this.jmsServer.stop();
        } catch(Exception e) {
            // ignore
        }
    }

    @org.junit.Test
    public void simpleTest() throws Exception {

        Hashtable d = new Hashtable();
        d.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
        d.put("connectionFactory.invmConnectionFactory", "vm://0");
        final ActiveMQInitialContextFactory activeMQInitialContextFactory = new ActiveMQInitialContextFactory();
        Context initialContext = activeMQInitialContextFactory.getInitialContext(d);
        ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("invmConnectionFactory");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue(QUEUE_NAME);

        MessageProducer producer = session.createProducer(queue);
        MessageConsumer consumer = session.createConsumer(queue);
        CountDownLatch latch = new CountDownLatch(1);
        consumer.setMessageListener(message -> {
            System.out.println("=== " + message);
            try {
                message.acknowledge();
                session.commit();
                latch.countDown();
            } catch(JMSException e) {
                e.printStackTrace();
            }
        });

        connection.start();
        producer.send(session.createMessage());
        session.commit();
        if(!latch.await(2, TimeUnit.SECONDS)) {
            throw new IllegalStateException();
        }
        connection.close();
    }
}

Upvotes: 0

Views: 240

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35038

The problem with this code is subtle but important. When configuring the broker you're creating a queue like so:

...
final String QUEUE_NAME = "myQueue";
...
jmsServer.getActiveMQServer().createQueue(new QueueConfiguration(QUEUE_NAME))
...

This is perfectly valid in and of itself, but for this use-case involving a JMS queue it's important to note that this will result in an address named myQueue and a multicast queue named myQueue since the default routing type is MULTICAST and you didn't specify any routing type on your QueueConfiguration. This is not the kind of configuration you want for a JMS queue. You want an address and an ANYCAST queue of the same name (i.e. myQueue in this case) as noted in the documentation. Therefore, you should use:

...
import org.apache.activemq.artemis.api.core.RoutingType;
...
jmsServer.getActiveMQServer().createQueue(new QueueConfiguration(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST))

When you use the multicast queue the message sent by the JMS client will not actually be routed because it is sent with the anycast routing type.

Another option would be to not create the queue explicitly at all and allow it to be auto-created.

Upvotes: 1

Related Questions