Reputation: 1320
I am experiencing some dropouts on consumers I have setup using the pika library for rabbitmq. Along with pika I am using the twisted implementation to setup async consumers. I am not sure why this is happening but I wish to implement a reconnect if the consumer drops out and not sure how to go about doing this. Here is my current implementation
class Consumer(object):
def __init__(self, queue, exchange, routingKey, medium, signalRcallbackFunc):
self._queue_name = queue
self.exchange = exchange
self.routingKey = routingKey
self.medium = medium
print "client on"
self.channel = None
self.medium.client.on(signalRcallbackFunc, self.callback)
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
self.channel.basic_qos(prefetch_count=500)
return self.channel.queue_declare(queue=self._queue_name, durable=True)
def queue_declared(self, queue):
self.channel.queue_bind(queue=self._queue_name,
exchange=self.exchange,
routing_key=self.routingKey)
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
print(properties.headers)
channel.basic_ack(method.delivery_tag)
print "#####################################" + method.delivery_tag + "###################################"
def callback(self, data):
#self.channel.basic_ack(data, multiple=True)
pass
Upvotes: 4
Views: 4806
Reputation: 15788
Is there any reason you can't just close the connection and reopen it?
@contextmanager
def with_pika_connection():
credentials = pika.PlainCredentials(worker_config.username, worker_config.password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=worker_config.host,
credentials=credentials,
port=worker_config.port,
))
try:
yield connection
finally:
connection.close()
@contextmanager
def with_pika_channel(queuename):
with with_pika_connection() as connection:
channel = connection.channel()
while True:
while not stopping:
try:
with with_pika_channel(queuename) as (connection, channel):
consumer_tag = channel.basic_consume(
callback,
queue=queuename,
)
channel.start_consuming()
except Exception as e:
reportException(e)
# Continue
Upvotes: 1
Reputation: 46
You could register an 'on-close' handler with the connection within the on_connected callback. This gets called when the connection is lost. Here, you can re-establish a new connection.
The following example is relatively useful and it's a strategy I have used to good effect... http://pika.readthedocs.io/en/latest/examples/asynchronous_consumer_example.html
For the twisted pika library the add_on_close_callback
method will probably get you quite far (although I have not tested). https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html
Upvotes: 2