JaskeyLam
JaskeyLam

Reputation: 15755

Only one thread is running concurrently in executor service, RabbitMQ

I have create a connection with a specified thread pool with 20 cores.

        ConnectionFactory factory = new ConnectionFactory();
        ....
        //specified es
        ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory);
        con = factory.newConnection(consumerExecutor, addresses);

then create a channel from this connection:

        final Channel channel = connection.createChannel();

and use this to create a DefaultConsumer.

While I find that though the threads can be used to consume messages, always, only one thread is consuming messages even though messages are massive accumulated in servers.

I look into the source code and find :

private final class WorkPoolRunnable implements Runnable {

    @Override
    public void run() {
        int size = MAX_RUNNABLE_BLOCK_SIZE;
        List<Runnable> block = new ArrayList<Runnable>(size);
        try {
            Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
            if (key == null) return; // nothing ready to run
            try {
                for (Runnable runnable : block) {
                    runnable.run();
                }
            } finally {
                if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
                    ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
                }
            }
        } catch (RuntimeException e) {
            Thread.currentThread().interrupt();
        }
    }
}


/* Basic work selector and state transition step */
private K readyToInProgress() {
    K key = this.ready.poll();
    if (key != null) {
        this.inProgress.add(key);
    }
    return key;
}


/**
 * Return the next <i>ready</i> client,
 * and transfer a collection of that client's items to process.
 * Mark client <i>in progress</i>.
 * If there is no <i>ready</i> client, return <code><b>null</b></code>.
 * @param to collection object in which to transfer items
 * @param size max number of items to transfer
 * @return key of client to whom items belong, or <code><b>null</b></code> if there is none.
 */
public K nextWorkBlock(Collection<W> to, int size) {
    synchronized (this) {
        K nextKey = readyToInProgress();
        if (nextKey != null) {
            VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
            drainTo(queue, to, size);
        }
        return nextKey;
    }
}

The trick should be in ConsumerWorkService.this.workPool.nextWorkBlock, it poll the channel from the ready queue, and add to the read queue in the finish block after running the callback run(). Please correct me if I am wrong.

This is confusing since a consumer is bound to one channel, and the channel is not released to the queue until last task block is finished, which means the thread pool is always offering only one thread for that consumer.

Questions:

  1. Why RabbitMQ designs this model
  2. How do we optimize this issue
  3. Is it good to submit the task to a standalone thread pool in handleDelivery to consume messages as well as ack(to ensure message ack only after task finishes)

Upvotes: 1

Views: 2406

Answers (1)

wheleph
wheleph

Reputation: 8354

> 1. Why RabbitMQ designs this model

I would like to know the reason myself. But this fact is clearly reflected in their documentation:

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

> 2. How do we optimize this issue

You can either have multiple channels or decouple message consumption from processing by submitting the actual work to another thread pool. You can find more details in this article.

> 3. Is it good to submit the task to a standalone thread pool in handleDelivery to consume messages as well as ack(to ensure message ack only after task finishes)

Quote from the docs:

When manual acknowledgements are used, it is important to consider what thread does the acknowledgement. If it's different from the thread that received the delivery (e.g. Consumer#handleDelivery delegated delivery handling to a different thread), acknowledging with the multiple parameter set to true is unsafe and will result in double-acknowledgements, and therefore a channel-level protocol exception that closes the channel. Acknowledging a single message at a time can be safe.

Upvotes: 4

Related Questions