Reputation: 2342
I would like to run a process periodically(like once per 10 minutes, or once per hour) that gets all the messages from queue, processes them and then exits. Is there any way to do this with pika
or should I use a different python lib?
Upvotes: 5
Views: 8842
Reputation: 317
This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.
updated for amqpstorm 2.6.1 :
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result.body)
channel.basic.ack(result.method['delivery_tag'])
channel.close()
connection.close()
Upvotes: 1
Reputation: 26342
I think an ideal solution here would be to use the basic_get method. It will fetch a single message, but if the the queue is already empty it will return None
. The advantage of this is that you can clear the queue with a simple loop, and then simply break the loop once None
is returned, plus it is safe to run basic_get with multiple consumers.
This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result['body'])
channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
Upvotes: 9
Reputation: 34272
Would this work for you:
N = queue.method.message_count
N
are processed, call channel.stop_consuming
.So, client code would be something like this:
class CountCallback(object):
def __init__(self, count):
self.count = count
def __call__(self, ch, method, properties, body):
# process the message here
self.count -= 1
if not self.count:
ch.stop_consuming()
channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
Upvotes: 3