Prateek
Prateek

Reputation: 333

Is it fine to use single channel for communication with a single thread executor in RabbitMQ?

I am trying to interact with RabbitMQ server using RabbitMQ-java client API. I read from the java client api guide:

As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

I am trying to use a ThreadPoolExecutor with corePoolSize 1 and adding Runnable tasks to save messages in RabbitMQ queues. Here is the code that I am using:

package common;

import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.JsonObject;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitMQUtil {
    private static Logger log= LoggerFactory.getLogger("logger");
    private static RabbitMQUtil gmInstance;
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000));
    private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties";
    private final Properties properties = new Properties();
    private String host = null;
    private int port = 0;
    private String username = null;
    private String password = null;
    private String useSSL = "false";
    private ConnectionFactory factory;
    private Connection connection;
    private Channel channel;

    private RabbitMQUtil() throws IOException, TimeoutException, Exception {
        try {
            InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME);
            if(stream != null) {
                properties.load(stream);
            }
        } catch (Exception ex) {
            log.error("Exception while loading the rabbitmq properties file:", ex);
        }

        host = properties.getProperty("rabbitmq.host", "localhost");
        port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672"));
        username = properties.getProperty("rabbitmq.username", "guest");
        password = properties.getProperty("rabbitmq.password", "guest");
        useSSL = properties.getProperty("rabbitmq.usessl", "false");

        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);
        if("true".equalsIgnoreCase(useSSL)) {
            try {
                factory.useSslProtocol();
            } catch (KeyManagementException | NoSuchAlgorithmException e) {
                log.error("Exception while applying the tls for rabbitmq:", e);
            }
        }
        connection = factory.newConnection();
        connection.addBlockedListener(new RabbitMQBlockedListener());
        connection.addShutdownListener(new RabbitMQShutDownListener());

        channel = connection.createChannel();
    }

    public static RabbitMQUtil getInstance() {
        if(gmInstance == null) {
            synchronized (RabbitMQUtil.class) {
                if(gmInstance == null) {
                    try {
                        gmInstance = new RabbitMQUtil();
                    } catch (IOException | TimeoutException e) {
                        log.error("Exception in getInstance:", e);
                    } catch (Exception e) {
                        log.error("Exception in getInstance:", e);
                    }
                }
            }
        }
        return gmInstance;
    }

    public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) {
        log.info("data to be saved for :"+queueName+" is:"+obj.toString());
    }

    public void saveMsgInQueue(JsonObject gson, String queueName) {
        this.executor.execute(new RabbitMQData(gson, queueName));
    }

    private class RabbitMQBlockedListener implements BlockedListener {
        @Override
        public void handleBlocked(String arg0) throws IOException {
            log.warn("blocked listener called:", arg0);
        }

        @Override
        public void handleUnblocked() throws IOException {
            log.warn("unblocked listener called:");
        }
    }

    private class RabbitMQShutDownListener implements ShutdownListener {
        @Override
        public void shutdownCompleted(ShutdownSignalException cause) {
            log.error("Shutdown event listener called:", cause);
            log.error("shutdown event listener:"+cause.isHardError());
        }
    }

    private class RabbitMQData implements Runnable{
        JsonObject obj;
        String queueName;
        public RabbitMQData(JsonObject obj, String queueName) {
            Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName);
            this.obj = obj;
            this.queueName = queueName;
        }

        @Override
        public void run() {
            try {
                channel.queueDeclare(this.queueName, true, false, false, null);
                channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes());
            } catch (Exception e) {
                log.info("Error while running the scheduled rabbitmq task:", e);
                log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString());
            }
        }
    }

    public static void saveRabbitMQData(JsonObject obj, String queueName) {
        RabbitMQUtil util = RabbitMQUtil.getInstance();
        if(util != null) 
            util.saveMsgInQueue(obj, queueName);
        else
            RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName);
    }
}

I would like to know the following things:

  1. Is is fine to use a single channel when a threadpool of only 1 thread is used ?
  2. How should connection and channel objects be handled when blocked/unblocked and shutdown events are triggered ? Although the API automatically establishes connection when RabbitMQ server is up again.

Any other feedback will be appreciated.

Thank you

Upvotes: 1

Views: 284

Answers (1)

Jose Zevallos
Jose Zevallos

Reputation: 725

1.- Is is fine to use a single channel when a threadpool of only 1 thread is used ?

yes, it is fine. that is the way you should do it. only one thread must use a Channel instance. Otherwise, confirmations might be lost (see here: https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.html)

2.- How should connection and channel objects be handled when blocked/unblocked and shutdown events are triggered ? Although the API automatically establishes connection when RabbitMQ server is up again.

when the application is shutting down, you should close the channel, then close the connection to RabbitMQ.

    channel.close();
    conn.close();

about blocking/unblocking, please read here (https://www.rabbitmq.com/api-guide.html) :

Callbacks to Consumers are dispatched in a thread pool separate from the thread that instantiated its Channel. This means that Consumers can safely call blocking methods on the Connection or Channel, such as Channel#queueDeclare or Channel#basicCancel.

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

Upvotes: 2

Related Questions