Matt
Matt

Reputation: 8942

Concurrent message consumption in ActiveMQ

I use a ActiveMQServer as a broker.

Server.java

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.impl.ConfigurationImpl;
import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServers;

public class Server {
    public static void main(final String arg[]) throws Exception
    {
        try
        {
            // Step 1. Create the Configuration, and set the properties accordingly
            Configuration configuration = new ConfigurationImpl();
            //we only need this for the server lock file
            configuration.setJournalDirectory("target/data/journal");
            configuration.setPersistenceEnabled(false); // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
            configuration.setSecurityEnabled(false); // http://activemq.apache.org/security.html
            /**
             * this map with configuration values is not necessary (it configures the default values).
             * If you want to modify it to run the example in two different hosts, remember to also
             * modify the client's Connector at {@link EmbeddedRemoteExample}.
             */
            Map<String, Object> map = new HashMap<String, Object>();
            map.put("host", "localhost");
            map.put("port", 61616);

            // https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch14s04.html
            TransportConfiguration transpConf = new TransportConfiguration(NettyAcceptorFactory.class.getName(),map);

            HashSet<TransportConfiguration> setTransp = new HashSet<TransportConfiguration>();
            setTransp.add(transpConf);

            configuration.setAcceptorConfigurations(setTransp); // https://github.com/apache/activemq-6/blob/master/activemq-server/src/main/java/org/apache/activemq/spi/core/remoting/Acceptor.java

            // Step 2. Create and start the server
            ActiveMQServer server = ActiveMQServers.newActiveMQServer(configuration);
            server.start();
        }
        catch (Exception e)
        {
            e.printStackTrace();
            throw e;
        }
    }
}

and I have a Client.java class, where I create a queue, message producer and message consumer.

Client.java

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.MessageHandler;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;

public class Client
{

    private static final String queueName = "queue.exampleQueue";
    private static final String propName = "myprop";

    ClientSessionFactory sf = null;
    ClientSession session = null;
    ClientProducer producer = null;
    ClientMessage message = null;
    ClientConsumer consumer = null;
    String name;

    public Client(String name){
        this.name = name;
    }

    public void initializeComponents(){ 

        try
          {          
            // Step 3. As we are not using a JNDI environment we instantiate the objects directly

             /**
              * this map with configuration values is not necessary (it configures the default values).
              * If you modify it to run the example in two different hosts, remember to also modify the
              * server's Acceptor at {@link EmbeddedServer}
              */
             Map<String,Object> map = new HashMap<String,Object>();
             map.put("host", "localhost");
             map.put("port", 61616);
             // -------------------------------------------------------

             ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
             sf = serverLocator.createSessionFactory();

             // Step 4. Create a core queue
             ClientSession coreSession = sf.createSession(false, false, false);

             if (!coreSession.queueQuery(new SimpleString(queueName)).isExists())
                 coreSession.createTemporaryQueue(queueName, queueName);

             coreSession.close();

            // Step 5. Create the session, and producer
            session = sf.createSession();

            producer = session.createProducer(queueName); // 

            // Step 7. Create the message consumer and start the connection
            consumer = session.createConsumer(queueName); 
            session.start();

            // Step 8. Receive the message.
            consumer.setMessageHandler(new MessageHandler(){

                public void onMessage(ClientMessage message)
                {
                    System.out.println("client " + name + " received message " + message.getStringProperty(propName));
                }

            });

          }
          catch (Exception e)
          {
             e.printStackTrace();
          }

    }

    public void sendMessage(String messageText){

        // Step 6. Create and send a message
        message = session.createMessage(false);
        message.putStringProperty(propName, messageText);

        try {
            System.out.println("Producer is going to send a message");
            producer.send(message);
        } catch (ActiveMQException e) {
            e.printStackTrace();
        }
    }

    public void cleanUpConnection(){
        if (sf != null)
        {
           sf.close();
        }
    }

}

In this main I create one queue, two producers and two consumers.

       public static void main(final String[] args)
   {
       Client cl1 = new Client("cl1");
       cl1.initializeComponents();

       Client cl2 = new Client("cl2");
       cl2.initializeComponents();

       for (int i = 0; i < 10; i++){
           try {

               Date date = new Date();
               String formattedDate = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss").format(date);

               cl1.sendMessage(formattedDate + " number of iteration " + i);
               Thread.sleep(2000);

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

   }

       cl1.cleanUpConnection();
       cl2.cleanUpConnection();

   }

and this is the output of main:

Producer is going to send a message
client cl1 received message 05/06/2015 16:56:22 number of iteration 0
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:24 number of iteration 1
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:26 number of iteration 2
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:28 number of iteration 3
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:30 number of iteration 4
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:32 number of iteration 5
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:34 number of iteration 6
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:36 number of iteration 7
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:38 number of iteration 8
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:40 number of iteration 9
client cl2 received message 05/06/2015 16:56:22 number of iteration 0
client cl2 received message 05/06/2015 16:56:26 number of iteration 2
client cl2 received message 05/06/2015 16:56:30 number of iteration 4
client cl2 received message 05/06/2015 16:56:34 number of iteration 6
client cl2 received message 05/06/2015 16:56:38 number of iteration 8

I would like to ask, how can I make concurrent message consumption for all consumers.

I mean:

client cl1 received ... message of iteration 0
client cl2 received ... message of iteration 0
client cl1 received ... message of iteration 1
client cl2 received ... message of iteration 1

I found prefetch limit settings, but don't know how to use ActiveMQConnectionFactory and ActiveMQConnection classes without refactoring in client.java class. Is there any other options to make message consumption concurrent for all consumers?

Upvotes: 1

Views: 1818

Answers (1)

Ben Damer
Ben Damer

Reputation: 1036

You are only sending a single message per iteration, thus only a single client will receive a message. If you want each client to receive the same messages, use a topic instead of a queue.

Upvotes: 1

Related Questions