TraiTran
TraiTran

Reputation: 83

Multiple Consumer RabbitMQ on one Queue - Java

I'm newbie RabbitMQ java client. My problem: I created 10 consumer and add them into the queue. Every consumer use 10 seconds in order to handle my process. I checked rabbit's page, i seen my queue had 4000 message no send to client. I checked log client and result was get one message for one consumer, after 10 seconds I get one message for one consumer and so on .. I want get 10 message for all consumer at the time(10 message - 10 consumer process at the time) Please help me, I didn't find solution for problem. Thank a lot.

        while (!isRetry) {
        try {
            isRetry = true;
            connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
            String queueName = "webhook_customer";
            String exchangeName = "webhook_exchange";
            String routingKey = "customer";
            System.out.println("step2");

            Channel channel = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            channel.basicQos(1);
            for (int i = 0; i < numberWorker; i++) {
                Consumer consumer = new QueueingConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException {
                        long startProcess = System.nanoTime();
                        JSONObject profile = null;
                        try {

                        } catch (IOException ioe) {
                            handleLogError(profile, ioe.getMessage().toString());
                        } catch (Exception e) {
                            handleLogError(profile, e.getMessage());
                        } finally {
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            long endProcess = System.nanoTime();
                            _logger.info("===========######### TIME PROCESS  + " + (endProcess - startProcess) + " Nano Seconds  ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
                        }
                    }
                };

                channel.basicConsume(queueName, false, consumer);
            }
            System.out.printf("Start Listening message ...");
        } catch (Exception e) {
            System.out.println("exception " + e.getMessage());
            isRetry = closeConnection(connection);
            e.printStackTrace();
        } finally {
        }
        if (!isRetry) {
            try {
                System.out.println("sleep waiting retry ...");
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //END
    }

Upvotes: 0

Views: 4041

Answers (2)

TraiTran
TraiTran

Reputation: 83

I did found solution in my case. I use new thread in consumer when message come in and I process in it. And I create multiple channel in order to multiple message at the time. I use threadpool to control thread

Upvotes: 1

Simon
Simon

Reputation: 2423

From your code sample, it seems that you could use the QueueingConsumer instead of DefaultConsumer. This will pull out more messages from RabbitMQ to the consumer(s) and queue them until they are processed.

Then, in your for (int i = 0; i < 10; i++) loop, you are consuming 10 times with the same consumer instance. You should instead create 10 consumers as such:

for (int i = 0; i < 10; i++) {
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          channel.basicAck(envelope.getDeliveryTag(),false);
        }
    };

    channel.basicConsume(queueName, false, consumer);
}

Ideally, create another class and properly create new instances instead of anonymous instances in the loop.

Note: Your consumers should execute their process in the background (separate thread) otherwise they will block eachother. Although, the information you provided does not really show how you will actually handle the messages.

Upvotes: 0

Related Questions