Reputation: 9107
I am using MQs for the first time and attempting to implement a logging system with RabbitMQ. My implementation involves a 'sender'
* This class sends messages over MQ
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
* Boilerplate stuff
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
Which sends one message for each of my colors to the exchange, where the color will be used as bindings. And it involves a 'receiver'
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
* Boilerplate stuff
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
which is given as a parameter a number representing which color bindings I would like it to be fed from the exchange.
In my implementation, and in RabbitMQ in general, it seems like messages are stored in the exchange until the Consumer
asks for them, at which point they are distributed to their respective queues and then sent one at a time to the client (or consumer in MQ lingo). My problem is that when I run the MQSender
class before running the MQReceiver
class the messages never get delivered. But when I run the MQReceiver
class first the messages are received. From my understanding of MQ I would think that the messages should be stored on the server until the MQReceiver
class is run, then the messages should be delivered to their consumers, however this is not what is happening. My main question is whether these messages can be stored in an exchange and if not, where should they be stored so that they will be delivered once a consumer (i.e. my MQReceiver
class) is called?
Thanks for your help!
Upvotes: 1
Views: 1901
Reputation: 12224
RabbitMQ discards messages if their routing key doesn't match any queues bound to the exchange. When you start MQSender
first, no queues are bound, so the messages it sends are lost. When you start MQReceiver
, it binds queues to the exchange, so RabbitMQ has a place to put the message from MQSender
. When you stop MQReceiver, since you created an anonymous queue, the queue and all bindings are removed from the exchange.
If you want messages to be stored on the server while MQReceiver
is not running, you need the receiver to create a named queue, and bind the routing keys to that queue. Note that creating a named queue is idempotent, and the queue won't be created if it already exists. Then you need the receiver to pull messages off the named queue.
Change your code to look something like this:
String namedQueue = "logqueue";
//declare named queue and bind log level routing keys to it.
//RabbitMQ will put messages with matching routing keys in this queue
channel.queueDeclare(namedQueue, false, false, false, null);
for (int level = 0; level < LOG_LEVELS.length; level++) {
channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]);
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
QueueingConsumer consumer = new QueueingConsumer(channel);
//Consume messages off named queue instead of anonymous queue
String namedQueue = "logqueue";
channel.basicConsume(namedQueue, true, consumer);
while(true) {
Upvotes: 2