Alexander Putilin
Alexander Putilin

Reputation: 2342

Pika: how to consume messages synchronously

I would like to run a process periodically(like once per 10 minutes, or once per hour) that gets all the messages from queue, processes them and then exits. Is there any way to do this with pika or should I use a different python lib?

Upvotes: 5

Views: 8842

Answers (3)

Fardin Allahverdi
Fardin Allahverdi

Reputation: 317

@eandersson

This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.

updated for amqpstorm 2.6.1 :

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result.body)
    channel.basic.ack(result.method['delivery_tag'])
channel.close()
connection.close()

Upvotes: 1

eandersson
eandersson

Reputation: 26342

I think an ideal solution here would be to use the basic_get method. It will fetch a single message, but if the the queue is already empty it will return None. The advantage of this is that you can clear the queue with a simple loop, and then simply break the loop once None is returned, plus it is safe to run basic_get with multiple consumers.

This example is based on my own library; amqpstorm, but you could easily implement the same with pika as well.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
    result = channel.basic.get(queue='simple_queue', no_ack=False)
    if not result:
        print("Channel Empty.")
        # We are done, lets break the loop and stop the application.
        break
    print("Message:", result['body'])
    channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()

Upvotes: 9

bereal
bereal

Reputation: 34272

Would this work for you:

  1. Measure the current queue length as N = queue.method.message_count
  2. Make the callback count the processed messages and as soon as N are processed, call channel.stop_consuming.

So, client code would be something like this:

class CountCallback(object):
    def __init__(self, count):
        self.count = count

    def __call__(self, ch, method, properties, body):
        # process the message here
        self.count -= 1
        if not self.count:
            ch.stop_consuming()

channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()

Upvotes: 3

Related Questions