vadlamani maitreya
vadlamani maitreya

Reputation: 21

A good heartbeat interval for pika-rabbitmq in Amazon ec2

I am using the latest pika library(0.9.9+) for rabbitmq. My usage for rabbitmq and pika is as follows :

  1. I have long running tasks (about 5 minutes) as workers. These tasks take their requests from rabbitmq.The requests come very infrequently i.e. there is a long idle time between requests.
  2. The problem i was facing previously is related to idle connections(connection closures due to idle connections). So, I have enabled heartbeat in pika.
  3. Now the selection of heartbeat is a problem. Pika seems to be a single threaded library where heartbeats reception and acknowledgement happens to be done in-between requests time frame.
  4. So, if the heartbeat interval is set less than the time the callback function uses to do its long running computations, the server does not receive any heartbeat acknowledgements and closes the connection.
  5. So, I assume the minimum heartbeat interval should be the maximum computation time of the callback function in a blocking connection.

What can be a good heartbeat value for amazon ec2 to prevent it closing idle connections ?

Also, some suggest to use rabbitmq keepalive (or libkeepalive) to maintain tcp connections. I think managing heartbeats at the tcp layer is much better because the application need not manage them.Is this true ? Is keepalive a good method when compared to RMQ heartbeats ?

I have seen that some suggest using multiple threads and queue for long running tasks. But is this the only option for long running tasks ? It is quite disappointing that another queue must be used for this scenario.

Thank you in advance. I think I have detailed the problem. Let me know if I can provide more details.

Upvotes: 2

Views: 3480

Answers (1)

Gerald Manipon
Gerald Manipon

Reputation: 140

If you're not tied to using pika, this thread helped me achieve what you're trying to do using kombu:

#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after

eventlet.monkey_patch()

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)


def long_running_function(body):
    time.sleep(300)

def job_worker(body, message):
    long_running_function(body)
    message.ack()

def monitor_heartbeats(connection, rate=2):
    """Function to send heartbeat checks to RabbitMQ. This keeps the
       connection alive over long-running processes."""
    if not connection.heartbeat:
        logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
        return
    interval = connection.heartbeat
    cref = weakref.ref(connection)
    logger.info("Starting heartbeat monitor.")

    def heartbeat_check():
        conn = cref()
        if conn is not None and conn.connected:
            conn.heartbeat_check(rate=rate)
            logger.info("Ran heartbeat check.")
            spawn_after(interval, heartbeat_check)
    return spawn_after(interval, heartbeat_check)

def main():
    setup_logging(loglevel='INFO')

    # process for heartbeat monitor
    p = None

    try:
        with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
            conn.ensure_connection()
            monitor_heartbeats(conn)
            queue = Queue('job_queue',
                          Exchange('job_queue', type='direct'),
                          routing_key='job_queue')
            logger.info("Starting worker.")
            with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
                consumer.qos(prefetch_count=1)
                for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
                    pass
    except KeyboardInterrupt:
        logger.info("Worker was shut down.")

if __name__ == "__main__":
    main()

I stripped out my domain specific code but essentially this is the framework I use.

Upvotes: 4

Related Questions