Reputation: 1039
I am trying to reliably send a message from a publisher to multiple consumers using RabbitMQ topic
exchange.
I have configured durable queues (one per consumer) and I am sending persistent messages delivery_mode=2
. I am also setting the channel in confim_delivery
mode, and have added mandatory=True
flag to publish.
Right now the service is pretty reliable, but messages get lost to one of the consumers if it stays down during a broker restart followed by a message publication.
It seems that broker can recover queues and messages on restart, but it doesn't seem to keep the binding between consumers and queues. So messages only reach one of the consumers and get lost for the one that is down.
Note: Messages do reach the queue and the consumer if the broker doesn't suffer a restart during the time a consumer is down. They accumulate properly on the queue and they are delivered to the consumer when it is up again.
Edit - adding consumers code:
import pika
class Consumer(object):
def __init__(self, queue_name):
self.queue_name = queue_name
def consume(self):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.queue_declare(queue=self.queue_name, durable=True)
channel.queue_bind(
exchange='myexchange', queue=self.queue_name, routing_key='my.route')
channel.basic_consume(
consumer_callback=self.message_received, queue=self.queue_name)
channel.start_consuming()
def message_received(self, channel, basic_deliver, properties, body):
print(f'Message received: {body}')
channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
You can assume each consumer server does something similar to:
c = Consumer('myuniquequeue') # each consumer has a permanent queue name
c.consume()
Edit - adding publisher code:
def publish(message):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.confirm_delivery()
success = channel.basic_publish(
exchange='myexchange',
routing_key='my.route',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
mandatory=True
)
if success:
print("Message sent")
else:
print("Could not send message")
# Save for sending later
It is worth saying that I am handling the error case on my own, and it is not the part I would like to improve. When my messages get lost to some of the consumers the flow goes through the success section
Upvotes: 2
Views: 5357
Reputation: 1841
Use basic.ack(delivery_tag=basic_deliver.delivery_tag)
in your consumer callback method. This acknowledgement tells whether the consumer has received a message and processed it or not. If it's a negative acknowledgement, the message will be requeued.
Edit #1
In order to receive messages during broker crash, the broker needs to be distributed. It is a concept called Mirrored Queues in RabbitMQ. Mirrored Queues
lets your queues to be replicated across the nodes in your cluster. If one of the nodes containing the queue goes down, the other node containing the queue will act as your broker.
For complete understanding refer this Mirrored Queues
Upvotes: 0