Reputation: 35099
I'm looking for any help. I want to fix an insert_order_queue()
function to be able to resend a message to RabbitMQ
if the message wasn't actually got delivered to the server.
This is my current code:
def insert_order_queue(self, msg):
''' Insert message into the queue '''
if msg:
msg_props = pika.BasicProperties(delivery_mode=conf.rabbit_msg_props_delivery_mode,
content_type=conf.rabbit_msg_props_content_type)
logger.info('Message : %s' % msg)
try:
self.channel.basic_publish(body=json.dumps(msg),
exchange=conf.rabbit_exchange_name,
properties=msg_props,
routing_key=conf.rabbit_exchange_routing_key)
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
logger.error('AMQP Connection failed. Trying again... %s' % error)
self._connect()
return
else:
logger.error('Something wrong')
And this is my _connect()
method:
def _connect(self):
''' Connecting to the RabbitMQ, and declare queue '''
logger.info('Trying to connect to RabbitMQ')
while True:
try:
conn_broker = pika.BlockingConnection(
pika.ConnectionParameters(
host=conf.rabbit_server,
port=conf.rabbit_port,
virtual_host=conf.rabbit_vhost,
ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
heartbeat_interval=conf.rabbit_heartbeat_interval,
credentials=pika.PlainCredentials(
conf.rabbit_user,
conf.rabbit_pass)))
logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port))
channel = conn_broker.channel()
# Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
status = channel.queue_declare(queue=conf.rabbit_queue_name,
durable=conf.rabbit_queue_durable,
exclusive=conf.rabbit_queue_exclusive,
passive=conf.rabbit_queue_passive)
if status.method.message_count == 0:
logger.info("Queue empty")
else:
logger.info('Queue status: %s' % status)
channel.queue_bind(
queue=conf.rabbit_queue_name,
exchange=conf.rabbit_exchange_name,
routing_key=conf.rabbit_exchange_routing_key)
return channel
except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), error:
time.sleep(3)
logger.error('Exception while connecting to Rabbit %s' % error)
else:
break
Upvotes: 0
Views: 1548
Reputation: 179
There are a couple ways a message can not be "delivered"
The most obvious is "the connection to rabbit is closed" in which case you just reconnect and resend (you already have most of that logic in terms of the reconnection, just need to resend the message).
Then there are a couple variations on "no one is listening for this message". These can be handled by the immediate and mandatory flags on basic_publish. See this for more information: http://bunnyamqp.wordpress.com/2009/08/21/amqp-basic-publish-immediate-versus-mandatory/
Finally you can add a confirmation callback. Pika lets you set this callback:
https://github.com/pika/pika/blob/master/pika/channel.py#L387
From within that callback you can decide to send the message again or not.
Upvotes: 1