vinkomlacic
vinkomlacic

Reputation: 1879

How to configure RabbitMQ to serve multiple queues to multiple consumers equally

I have a RabbitMQ broker with multiple queues set up. On the client side (Java) I have multiple consumers which are all listening to their queue like so:

QUEUE_1 -> DataConsumer1; QUEUE_2 -> DataConsumer2 ...

They are all using one connection but different channel. What happens is when I load all the queues and start the application broker serves first one queue than the other and so on. So messages are received by their respective consumer one queue at the time. I would also like to mention that I'm using prefetch count of 1 in an attempt to achieve fair distribution of consumer traffic.

How can I make it happen so that all queues are served equally.

EDIT: Here is the code that creates consumers (pretty basic)

import com.rabbitmq.client.*;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * Used for consuming and acknowledging messages from defined queue.
 *
 */
public class Consumer {
    private final static Logger logger = Logger.getLogger(Consumer.class);
    // Maximum number of messages that can be on the consumer at a time
    private static int prefetchCount = 1;

    // Internal enum which contains queue names and their exchange keys
    private Queue queue;
    private Channel channel;
    private String consumerTag;
    private String uuid = UUID.randomUUID().toString();
    private boolean subscribed = false;
    private DeliverCallback deliverCallback = this::handleDeliver;
    private CancelCallback cancelCallback = this::handleCancel;
    private ConsumerShutdownSignalCallback consumerShutdownSignalCallback = this::handleShutdown;

    /**
     * The constructors sets the channel to RabbitMQ broker for the specified queue.
     * Callback for events are set to their default implementation.
     *
     * @param queue RabbitMQ queue - this consumer will be assigned to this queue and will only be able to consume from it.
     * @see #setDeliverCallback(DeliverCallback)
     * @see #setCancelCallback(CancelCallback)
     * @see #setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback)
     */
    public Consumer(Queue queue) {
        this.queue = queue;

        try {
            setUpChannel();

        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    public Class getEntityClass() {
        return Queue.getEntityClassForQueue(queue);
    }

    public String getUuid() {
        return uuid;
    }

    public boolean isSubscribed() {
        return subscribed;
    }

    public DeliverCallback getDeliverCallback() {
        return deliverCallback;
    }

    public void setDeliverCallback(DeliverCallback deliverCallback) {
        this.deliverCallback = deliverCallback;
    }

    public CancelCallback getCancelCallback() {
        return cancelCallback;
    }

    public void setCancelCallback(CancelCallback cancelCallback) {
        this.cancelCallback = cancelCallback;
    }

    public ConsumerShutdownSignalCallback getConsumerShutdownSignalCallback() {
        return consumerShutdownSignalCallback;
    }

    public void setConsumerShutdownSignalCallback(ConsumerShutdownSignalCallback consumerShutdownSignalCallback) {
        this.consumerShutdownSignalCallback = consumerShutdownSignalCallback;
    }


    /**
     * <p>
     * Subscribes to the set queue. The subscription can be cancelled using
     * Checks if the queue is set up properly.
     * </p>
     * <p>
     * Note: this is a non-blocking operation. The client will listen for incoming messages and handle them using
     * the provided DeliveryCallback function but the execution of this operation will be on another thread.
     * </p>
     *
     * @throws IOException if I/O problem is encountered.
     */
    public void subscribeToQueue() throws IOException {
        if (channel != null) {
            consumerTag = channel.basicConsume(
                    queue.getQueueName(),
                    deliverCallback,
                    cancelCallback,
                    consumerShutdownSignalCallback
            );
            subscribed = true;

        } else {
            logger.error("Channel does not exist. Unable to consume message.");

        }
    }

    /**
     * Confirms the message has been successfully processed.
     *
     * @param deliveryTag Unique message tag generated by the server.
     * @throws IOException if I/O problem is encountered.
     */
    public void acknowledgeMessageReceived(long deliveryTag) throws IOException {
        if (channel != null) {
            channel.basicAck(deliveryTag, false);

        } else {
            logger.error("Channel does not exist. Unable to acknowledge message delivery.");

        }
    }

    /**
     * Sends a negative acknowledgement to RabbitMQ without re-queueing the message.
     *
     * @param deliveryTag Unique message tag generated by the server.
     * @throws IOException if I/O problem is encountered.
     */
    public void rejectMessage(long deliveryTag) throws IOException {
        if (channel != null) {
            channel.basicReject(deliveryTag, false);

        } else {
            logger.error("Channel does not exist. Unable to reject message delivery.");

        }
    }

    /**
     * Cancels consumer subscription to the queue.
     * The consumer can be used for acknowledging messages, but will not receive new messages.
     * This does not close the underlying channel. To close the channel use closeChannel() method.
     *
     * @throws IOException
     * @see #subscribeToQueue()
     * @see #closeChannel()
     */
    public void cancelSubscription() throws IOException {
        if (channel != null) {
            channel.basicCancel(this.consumerTag);
            subscribed = false;

        } else {
            logger.error("Channel does not exist. Unable to cancel consumer subscription.");
        }
    }

    /**
     * Explicitly closes channel to the queue.
     * After doing this you will not be able to use any of the methods of this class.
     *
     * @throws IOException      if I/O problem is encountered.
     * @throws TimeoutException if connection problem occurs.
     */
    public void closeChannel() throws IOException, TimeoutException {
        if (channel != null) {
            channel.close();
            channel = null;
            logger.info("Closing RabbitMQ consumer channel...");

        } else {
            logger.error("Channel already closed.");

        }
    }

    /**
     * Checks if the queue exists and creates the channel.
     * If the queue does not exist channel is set to null and cannot be used.
     *
     * @throws IOException if I/O problem is encountered.
     */
    private void setUpChannel() throws IOException {

        channel = ChannelFactory.getInstance().createChannel();
        try {
            channel.queueDeclarePassive(queue.getQueueName());
            channel.basicQos(prefetchCount);

        } catch (IOException e) {
            // When this exception occurs it renders the channel unusable so it's best set to null.
            channel = null;

            logger.error(String.format("Queue %s does not exist [%s]", queue.getQueueName(), e.getMessage()));
            e.printStackTrace();

        }
        logger.info("Setting up RabbitMQ consumer channel. Channel successfully initialized: " + (channel != null));
    }

    /**
     * Callback called when a message is delivered to the client.
     * Default implementation. Callback acknowledges message received and does nothing with it.
     * To use custom implementation use setDeliverCallback method.
     *
     * @param consumerTag The consumer tag associated with the consumer.
     * @param message     Message object.
     * @see #setDeliverCallback(DeliverCallback)
     */
    private void handleDeliver(String consumerTag, Delivery message) {
        Envelope envelope = message.getEnvelope();
        long deliveryTag = envelope.getDeliveryTag();

        logger.info("Message delivered: " + deliveryTag);

        try {
            channel.basicAck(deliveryTag, false);

        } catch (IOException e) {
            e.printStackTrace();

        }
    }

    /**
     * Callback called when a service is cancelled.
     * Default implementation. To use custom implementation specify it in the constructor.
     *
     * @param consumerTag The consumer tag associated with the consumer.
     */
    private void handleCancel(String consumerTag) {
        logger.info("Consumer (" + consumerTag + ") cancelled: ");
    }

    /**
     * Called when the consumer is abruptly shutdown due to termination of the underlying connection or channel.
     * Default implementation. To use custom implementation specify it in the constructor.
     *
     * @param consumerTag The consumer tag associated with the consumer.
     * @param exception   Shutdown reason.
     */
    private void handleShutdown(String consumerTag, ShutdownSignalException exception) {
        logger.info(String.format("Consumer (%s) shutdown. Reason: %s", consumerTag, exception.getMessage()));
        logger.info(exception);
    }
}

Upvotes: 0

Views: 1573

Answers (1)

vinkomlacic
vinkomlacic

Reputation: 1879

UPDATE: resolved, apparently my prefetch count was not being set therefore it was unlimited. That is why the traffic was locked on one channel until the queue was exhausted.

Upvotes: 0

Related Questions