alphanumeric
alphanumeric

Reputation: 19329

How to change timeout using RabbitMQ pika.basic_consume in Python

With the RabbitMQ Python client running subscriber.py:

    import pika, time 

    credentials = pika.PlainCredentials('user', 'pass')
    parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.basic_qos(prefetch_count=1)
    channel.queue_declare(queue='my_queue')

    def callback(ch, method, properties, body):
        ch.basic_ack(delivery_tag=method.delivery_tag)
        time.sleep(600)
        print ('process completed')

    channel.basic_consume(queue='my_queue', on_message_callback=callback)
    channel.start_consuming()

the connection breaks after the callback function is completed. It appears it always happens on 60th second. It seems the channel.basic_consume() method doesn't want to wait for the main thread to complete the callback function. Is there a way to make sure the connection doesn't drop after 60th second?

Upvotes: 2

Views: 11259

Answers (2)

Luke Bakken
Luke Bakken

Reputation: 9637

Your time.sleep call is blocking Pika's I/O loop which prevents heartbeats from being processed. Don't block the I/O loop!!!

Instead, you should do your long-running work in a separate thread and acknowledge the message correctly from that thread. Fortunately, I have an example right here: link

Upvotes: 3

Rodrigo Miranda
Rodrigo Miranda

Reputation: 11

I think the "heartbeat" parameter solves this problem. Just set the time in seconds:

import pika, time 

credentials = pika.PlainCredentials('user', 'pass')
parameters = pika.ConnectionParameters(host='localhost', port=6672, credentials=credentials, heartbeat=36000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='my_queue')

def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    time.sleep(600)
    print ('process completed')

channel.basic_consume(queue='my_queue', on_message_callback=callback)
channel.start_consuming()

Upvotes: 1

Related Questions