Miguel
Miguel

Reputation: 939

I need to mock a RabbitMQ in my unit Test

I am using a RabbitMQ in my project.

I have in my consumer the code of the client part of rabbitMQ and the connection need a tls1.1 to connect with the real MQ.

I want to test this code in my JUnit test and to mock the message delivery to my consumer.

I see in google several examples with different tools how camel rabbit or activeMQ but this tools works with amqp 1.0 and rabbitMQ only works in amqp 0.9 .

Someone had this problem?

Thanks!

UPDATE

This is the code to testing to receive a json from the queue.

package com.foo.foo.queue;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import javax.net.ssl.*;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;

import com.foo.foo.Constants.Constants;
import com.foo.foo.core.ConfigurationContainer;
import com.foo.foo.policyfinders.PolicyFinder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class BrokerThreadHLConsumer extends Thread {

private static BrokerThreadHLConsumer instance;

private static final Logger log = LogManager.getLogger(BrokerThreadHLConsumer.class);

private Channel channel;
private String queueName;
private PolicyFinder PolicyFinder;
private Connection connection;
private QueueingConsumer consumer;

private boolean loop;

private BrokerThreadHLConsumer() throws IOException {
    ConnectionFactory factory = new ConnectionFactory();
    char[] keyPassphrase = "clientrabbit".toCharArray();
    KeyStore keyStoreCacerts;
    ConfigurationContainer configurationContainer = ConfigurationContainer.getInstance();
    String exchangeName = configurationContainer.getProperty(Constants.EXCHANGE_NAME);
    String rabbitHost = configurationContainer.getProperty(Constants.RABBITMQ_SERVER_HOST_VALUE);
    try {
        /* Public key cacerts to connect to message queue*/
        keyStoreCacerts = KeyStore.getInstance("PKCS12");
        URL resourcePublicKey = this.getClass().getClassLoader().getResource("certs/client.keycert.p12");
        File filePublicKey = new File(resourcePublicKey.toURI());
        keyStoreCacerts.load(new FileInputStream(filePublicKey), keyPassphrase);
        KeyManagerFactory keyManager;

        keyManager = KeyManagerFactory.getInstance("SunX509");
        keyManager.init(keyStoreCacerts, keyPassphrase);

        char[] trustPassphrase = "changeit".toCharArray();
        KeyStore tks;

        tks = KeyStore.getInstance("JCEKS");

        URL resourceCacerts = this.getClass().getClassLoader().getResource("certs/cacerts");
        File fileCacerts = new File(resourceCacerts.toURI());

        tks.load(new FileInputStream(fileCacerts), trustPassphrase);

        TrustManagerFactory tmf;
        tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.1");
        c.init(keyManager.getKeyManagers(), tmf.getTrustManagers(), null);

        factory.setUri(rabbitHost);
        factory.useSslProtocol(c);
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, "fanout");
        queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, exchangeName, "");

    } catch (NoSuchAlgorithmException e) {
        e.printStackTrace();
    } catch (CertificateException e) {
        e.printStackTrace();
    } catch (KeyStoreException e) {
        e.printStackTrace();
    } catch (UnrecoverableKeyException e) {
        e.printStackTrace();
    } catch (KeyManagementException e1) {
        e1.printStackTrace();
    } catch (Exception e) {
        log.error("Couldn't instantiate a channel with the broker installed in " + rabbitHost);
        log.error(e.getStackTrace());
        e.printStackTrace();
    }
}

public static BrokerThreadHLConsumer getInstance() throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException, IOException {
    if (instance == null)
        instance = new BrokerThreadHLConsumer();
    return instance;
}

public void run() {
    if (PolicyFinder != null) {
        try {
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, consumer);
            log.info("Consumer broker started and waiting for messages");
            loop = true;
            while (loop) {
                try {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    JSONObject obj = new JSONObject(message);
                    log.info("Message received from broker " + obj);
                    if (StringUtils.isNotEmpty(message) && !PolicyFinder.managePolicySet(obj)) {
                        log.error("PolicySet error: error upgrading the policySet");
                    }
                } catch (Exception e) {
                    log.error("Receiving message error");
                    log.error(e);
                }
            }
        } catch (IOException e) {
            log.error("Consumer couldn't start");
            log.error(e.getStackTrace());
        }
    } else {
        log.error("Consumer couldn't start cause of PolicyFinder is null");
    }
}

public void close() {
    loop = false;
    try {
        consumer.getChannel().basicCancel(consumer.getConsumerTag());
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        channel.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        connection.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public void setLuxPolicyFinder(PolicyFinder PolicyFinder) {
    this.PolicyFinder = PolicyFinder;
}
}

Upvotes: 22

Views: 36564

Answers (3)

Loïc Le Doyen
Loïc Le Doyen

Reputation: 1055

As I understand it, there are two things trying to be tested in the question:

  • TLS configuration to connect to RabbitMQ
  • basicPublish / basicConsume (what's called delivery) behavior regarding interactions with the rest of the application

For the first one, as TLS itself is being tested, only connecting to a real instance of RabbitMQ with correct truststore configured will prove that configuration is working

For the second one however, for tests demonstrating features of the app (with tools like Cucumber for readability), you may try a library i'm working on: rabbitmq-mock (and that's why I'm digging up an old post)

Just include it as dependency:

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>1.0.14</version>
    <scope>test</scope>
</dependency>

And replace new ConnectionFactory() by new MockConnectionFactory() in your unit test.

Samples are available in the project: https://github.com/fridujo/rabbitmq-mock/blob/master/src/test/java/com/github/fridujo/rabbitmq/mock/IntegrationTest.java

Upvotes: 8

The Ancient
The Ancient

Reputation: 380

I know, it is an old question, still as there is no answer so far. What helped me a lot at the same question, is the following blog post: https://tamasgyorfi.net/2016/04/21/writing-integration-tests-for-rabbitmq-based-components/. It uses Apache QPID (not ActiveMQ as suggested in the OP) and it has support for AMQP 0.9.1.

Upvotes: 1

NoobEditor
NoobEditor

Reputation: 15871

So here is how i did it, some stuffs might be here and there in process of hiding the necessary class implementation details, but you would get a hint! :)

  • assumption for unit test:
    • RMQ is working fine and data send to it would be pushed in queue
    • Only thing to be tested is if the data generated is correct or not
    • and whether the call to RMQs send() is happening or not!

 public class SomeClassTest {
        private Config config;
        private RmqConfig rmqConfig;
        private static final ObjectMapper mapper = new ObjectMapper();
        private JasperServerClient jasperServerClient;
    //    @Mock
        @InjectMocks
        private RabbitMQProducer rabbitMQProducer;
        private Connection phoenixConnection;
        private String targetNotificationMessage;
        SomeClass someClassObject;

        @Before
        public void setUp() {

            // Mock basic stuffs
            config = mock(Config.class);
            Connection = mock(Connection.class);
            rabbitMQProducer = mock(RabbitMQProducer.class); // Imp


            jasperServerClient = mock(JasperServerClient.class);

            rmqConfig = RmqConfig.builder()
                    .host("localhost")
                    .port(5672)
                    .userName("guest")
                    .password("guest")
                    .queueName("somequeue_name")
                    .prefetch(1)
                    .build();
            final String randomMessage = "This is a waste message";
            Message mockMsg = Message.forSending(randomMessage.getBytes(), null, rmqConfig.getQueueName(), rmqConfig.getQueueName(), "text/plain", "UTF-8", true); // prepare a mock message


            // Prepare service configs
            ConnectionConfig connectionConfig = RmqConfigUtil.getConfig(rmqConfig);
            ProducerConfig producerConfig = new ProducerConfigBuilder()
                    .exchange(rmqConfig.getQueueName())
                    .contentType("text/pain")
                    .contentEncoding("UTF-8")
                    .connection(connectionConfig).build();
            rabbitMQProducer.open(croducerConfig.asMap());

            // build the major stuff where the code resides
            someClassObject =  SomeClass.builder()
                    .phoenixConnection(phoenixConnection)
                    .userExchangeName(rmqConfig.getQueueName())
                    .userRabbitMQProducer(rabbitMQProducer)
                    .ftpConfig(config.getFtpConfig())
                    .jasperServerClient(jasperServerClient)
                    .objectMapper(new ObjectMapper())
                    .build();

            MockitoAnnotations.initMocks(this);
        }


        @Test
        public void testNotificationPub() throws Exception {

            // Prepare expected Values
            targetNotificationMessage = <<some message>>

            // Reflection -  my target functions were private
            Class cls = Class.forName("com.some.path.to.class");
            Object[] objForGetMessage = {<<stuffs>>, <<stuffs>>};

            Method getNotificationMessage = cls.getDeclaredMethod("private_fn_1", <<some class>>.class, <<some class>>.class);
            Method pubNotification = cls.getDeclaredMethod("private_fn_2", <<some class>>.class, RabbitMQProducer.class, String.class);

            getNotificationMessage.setAccessible(true);
            pubNotification.setAccessible(true);

            // Test Case #1
            final <<some class>> notificationMessage = (<<some class>>)getNotificationMessage.invoke(someClassObject, objForGetMessage);
            assertEquals(notificationMessage.getMessage(), targetNotificationMessage);

            // Test Case #2 -  this does RMQ call
            Object[] objPubMessage = {notificationMessage, rabbitMQProducer, rmqConfig.getQueueName()};
            final Object publishNotification = pubNotification.invoke(someClassObject, objPubMessage);
            assertEquals(publishNotificationResp, publishNotification); //viola


            //Important, since RabbitMQProducer is mocked, we need to checkup if function call is made to "send" function which send data to RMQ
            verify(rabbitMQProducer,times(1)).send(any());

        }


        @Test
        public void testMockCreation(){
            assertNotNull(rmqConfig);
            assertNotNull(config);
        }

Upvotes: 0

Related Questions