Reputation: 142
I have a RabbitMQ client application that listens to a specific queue. The client creates and instance of DefaultConsumer and implements the handleDelivery method. Here is the code
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
// channel.basicQos(pollCount);
Message message = new Message();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String response = new String(body, "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
}
}
};
logger.debug("**********Channel status: " + channel.isOpen());
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
The method receiveMessages() is called frequently through a thread every 500ms and drains the messages into a different List for consumption. Due to this poll on receiveMessages() I observed that the consumer tags are continuously getting created and growing when viewed through rabbit console like in the picture. Is it normal to see those increasing consumer tags?
Upvotes: 0
Views: 2964
Reputation: 9657
Is it normal to see those increasing consumer tags?
No, your code has an error. You need to either just use a long-running consumer or you have to cancel your consumer when you are done with it.
I can't see any need to "poll" receiveMessages
- just let it run on its own and it will add messages to your synchronized queue as you expect.
Upvotes: 1
Reputation: 1
public NotificationConsumerService(ConnectionFactory connectionFactory, String host, Logger logger) {
this.connectionFactory = connectionFactory;
this.host = host;
this.logger = logger;
}
public void consumeSliceChangeNotification() {
connectionFactory.setHost(this.host);
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
JSONObject obj = new JSONObject(message);
String namespace = obj.getString("namespace");
logger.info("The latest change notification on the " + namespace +" is available");
};
channel.basicConsume(QUEUE_NAME, true,deliverCallback, consumerTag -> { } );
}
catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
Upvotes: 0
Reputation: 142
I finally found a working solution.
As Luke Bakken highlighted there is no polling required. I just call receiveMesssages()
only once now. Thereafter my consumer is receiving callbacks as the messages are published into the queue.
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
public void receiveMessages() {
try {
Message message = new Message();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
String response = new String(delivery.getBody(), "UTF-8");
if (response != null) {
message.setId(NUID.nextGlobal());
message.setPayload(response);
message.setDeliveryTag(deliveryTag);
messages.add(message);
logger.info("Message received: ", message.getPayload());
};
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
} catch (Exception e) {
logger.error("Exception while getting messages from Rabbit ", e);
}
}
The rabbit console now shows only 1 consume tag entry under the bound queue.
Upvotes: 1