NublicPablo
NublicPablo

Reputation: 1039

RabbitMQ durable queue bindings

I am trying to reliably send a message from a publisher to multiple consumers using RabbitMQ topic exchange.

I have configured durable queues (one per consumer) and I am sending persistent messages delivery_mode=2. I am also setting the channel in confim_delivery mode, and have added mandatory=True flag to publish.

Right now the service is pretty reliable, but messages get lost to one of the consumers if it stays down during a broker restart followed by a message publication.

It seems that broker can recover queues and messages on restart, but it doesn't seem to keep the binding between consumers and queues. So messages only reach one of the consumers and get lost for the one that is down.

Note: Messages do reach the queue and the consumer if the broker doesn't suffer a restart during the time a consumer is down. They accumulate properly on the queue and they are delivered to the consumer when it is up again.

Edit - adding consumers code:

import pika


class Consumer(object):
    def __init__(self, queue_name):
        self.queue_name = queue_name

    def consume(self):
        credentials = pika.PlainCredentials(
             username='myuser', password='mypassword')
        connection = pika.BlockingConnection(
             pika.ConnectionParameters(host='myhost', credentials=credentials))
        channel = connection.channel()
        channel.exchange_declare(exchange='myexchange', exchange_type='topic')
        channel.queue_declare(queue=self.queue_name, durable=True)
        channel.queue_bind(
            exchange='myexchange', queue=self.queue_name, routing_key='my.route')
        channel.basic_consume(
            consumer_callback=self.message_received, queue=self.queue_name)
        channel.start_consuming()

    def message_received(self, channel, basic_deliver, properties, body):
        print(f'Message received: {body}')
        channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)

You can assume each consumer server does something similar to:

c = Consumer('myuniquequeue')  # each consumer has a permanent queue name
c.consume()

Edit - adding publisher code:

def publish(message):
    credentials = pika.PlainCredentials(
        username='myuser', password='mypassword')
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='myhost', credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='myexchange', exchange_type='topic')
    channel.confirm_delivery()
    success = channel.basic_publish(
        exchange='myexchange',
        routing_key='my.route',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ),
        mandatory=True
    )
    if success:
        print("Message sent")
    else:
        print("Could not send message")
        # Save for sending later

It is worth saying that I am handling the error case on my own, and it is not the part I would like to improve. When my messages get lost to some of the consumers the flow goes through the success section

Upvotes: 2

Views: 5357

Answers (1)

bumblebee
bumblebee

Reputation: 1841

Use basic.ack(delivery_tag=basic_deliver.delivery_tag) in your consumer callback method. This acknowledgement tells whether the consumer has received a message and processed it or not. If it's a negative acknowledgement, the message will be requeued.

Edit #1 In order to receive messages during broker crash, the broker needs to be distributed. It is a concept called Mirrored Queues in RabbitMQ. Mirrored Queues lets your queues to be replicated across the nodes in your cluster. If one of the nodes containing the queue goes down, the other node containing the queue will act as your broker.

For complete understanding refer this Mirrored Queues

Upvotes: 0

Related Questions