Lostsoul
Lostsoul

Reputation: 26037

infinite loop to keep consuming queue

I'm comsuming data from a queue to process. My goal is to have the data constantly processing and to not have errors crash the app so I log exceptions and try to let the program keep running. To do this I nested the cosume statement within a infinite loop but it doesn't seem to be working. Often I'll come to the program and see it says "[x] Done" and waiting while I can see there is a ton of data in the queue.

Here's a snippet of my code:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    doWork(body)
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
while True:
    try:
        channel.start_consuming()
    except:
        time.sleep(10)

What am I doing wrong? If my queue has 3000 entries, this will work for 10-15% then for some reason just hangs. Am I doing something wrong with my while loop?

Upvotes: 0

Views: 1576

Answers (2)

tdelaney
tdelaney

Reputation: 77387

You should do your error handling in the callback. I'm not sure whether its even legal to call start_consuming() again after an error (its internal state could be in some error condition). And you should log the errors you get so that you know whats happening and can refine the exception handler to only catch recoverable errors. I couldn't test this so pardon any minor errors.

import logging
import traceback

# NOTE: Just a simple logging config here, you can get fancier
logging.basicConfig(level=logging.DEBUG)

def callback(ch, method, properties, body):
    logger = logging.getLogger('callback')
    try:
        logger.info(" [x] Received %r" % (body,))
        doWork(body)
        logger.info(" [x] Done")
    except Exception, e:
        # get granular over time as you learn what
        # errors you get because some things like
        # SyntaxError should not be dropped
        logger.error("Exception %s: %s" %(type(e),e))
        logger.debug(traceback.format_exc())
    finally:
        # set to always ack... even on failure
        ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
channel.start_consuming()

Upvotes: 1

JimmyK
JimmyK

Reputation: 1040

I see you are using the RabbitMQ here. If so, this is what you have to do:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    doWork(body)
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='dataProcessingQueue')
channel.start_consuming()

Yes, no while True loop to wrap the start_consuming function.'

Reference RabbitMQ Tutorial

Upvotes: 0

Related Questions