littleK
littleK

Reputation: 20163

Getting Better Performance with ActiveMQ than RabbitMQ

I'm wondering if it is unusual to achieve faster raw messaging throughput (for both publishing and consuming) with ActiveMQ as opposed to RabbitMQ? I'm asking because EVERY other online reference that I have come across boasts RabbitMQ as being faster.

I'm not testing with legitimate benchmarking tools; rather I modified the basic publisher/consumer examples for both to test 100,000 messages with a 3 kilobyte message body. Please note that I am testing both publish and consume across two different Amazon EC2 x-large instances. Perhaps I have not set up my code correctly? Please see my results and code below.

ActiveMQ Send 3kb   
Average Time per Message (ns):  497276.1179
Average # Messages per second:  2010.935101
Total Time (s):                 49.72810906

ActiveMQ Recv 3kb   
Average Time per Message (ns):  43813.35476
Average # Messages per second:  22823.86285
Total Time (s):                 4.381379289

RabbitMQ Send 3kb   
Average Time per Message (ns):  1041524.626
Average # Messages per second:  960.1309229
Total Time (s):                 104.1524626

RabbitMQ Recv 3kb   
Average Time per Message (ns):  612559.3732
Average # Messages per second:  1632.494814
Total Time (s):                 61.25593732

Updated numbers after removing queueDeclare() in RabbitMQ Send.java & Recv.java:

This dramatically improved RabbitMQ's times, but something must be off with the ActiveMQ consumption time of only 4 seconds...

ActiveMQ Send 3kb   
Average Time per Message (ns):  491404.5666
Average # Messages per second:  2034.983124
Total Time (s):                 49.14045666

ActiveMQ Recv 3kb   
Average Time per Message (ns):  41976.17158
Average # Messages per second:  23823.03965
Total Time (s):                 4.197617158

RabbitMQ Send 3kb   
Average Time per Message (ns):  354795.8818
Average # Messages per second:  2818.522005
Total Time (s):                 35.47958818

RabbitMQ Recv 3kb   
Average Time per Message (ns):  440349.3892
Average # Messages per second:  2270.924009
Total Time (s):                 44.03493892

ActiveMQ Send.java

public class Send implements Runnable {

private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

public static void main(String[] argv) throws java.io.IOException {
    (new Thread(new Send())).start();
}

public void run() {
    try {
        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageProducer from the Session to the Topic or Queue
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for (int i=0; i <= NUMBER_OF_MESSAGES; i++){
            startTime = System.nanoTime();

            // 3kb
            String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..."

            TextMessage message = session.createTextMessage(text);

// Tell the producer to send the message
            //System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;
            System.out.println(i + "," + Long.toString(stopTime-startTime));

        }

        // Clean up
        session.close();
        connection.close();

        //System.out.println("");
        //System.out.println("Total Time: " + totalTime + "ns");
        //System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns");
        //System.out.println("");

    }
    catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}
}

ActiveMQ Recv.java

public class Recv implements Runnable {

private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv)
    throws java.io.IOException {

    (new Thread(new Recv())).start();

}

public void run() {
    try {

        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageConsumer from the Session to the Topic or Queue
        MessageConsumer consumer = session.createConsumer(destination);

        // Message Listener
        MyListener listener = new MyListener();
        consumer.setMessageListener(listener);

        // Wait for a message
        //Message message = consumer.receive(1000);

       // consumer.close();
       // session.close();

// connection.close();
    } catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}

public class MyListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            startTime = System.nanoTime();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                stopTime = System.nanoTime();
                totalTime = totalTime + stopTime-startTime;

                System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

                numMessages++;

            } else {
                System.out.println("Received: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
}

RabbitMQ Send.java

public class Send implements Runnable {

private final static String QUEUE_NAME = "hello";
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

// 3kb
private static final String message = "AMFu8UlKW2zJB..."

public static void main(String[] argv)
 throws java.io.IOException {

 (new Thread(new Send())).start();

}

public void run() {

try {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("localhost");
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();

     for (int i=1; i <= NUMBER_OF_MESSAGES; i++){
         startTime = System.nanoTime();

         // No Persistence
         // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

         stopTime = System.nanoTime();
         totalTime = totalTime + stopTime-startTime;
         System.out.println(i + "," + Long.toString(stopTime-startTime));
     }

     channel.close();
     connection.close();

 } catch (Exception e) {
     e.printStackTrace();
 }
}
}

RabbitMQ Recv.java

private final static String QUEUE_NAME = "hello";
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv) {
    (new Thread(new Recv())).start();
}

public void run(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // No Persistence
        // channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);

        while (true) {
            startTime = System.nanoTime();
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;

            System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

            numMessages++;

        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

Upvotes: 1

Views: 4203

Answers (1)

pfreixes
pfreixes

Reputation: 449

Well, I had a look of your code and the marks of your benchmarck, but just at Recv way. I saw that RabbitMq numbers are a double time than ActiveMq. Then I saw the source code of both, and something warned to me ..

At Rabbitqm Recv source code you are doing always a queuDeclare for every messages, if the comunications time is the current major latence sure the double time from ActiveMq than Rabbitmq comes from here.

Upvotes: 2

Related Questions