Reputation: 21
I am using the latest pika library(0.9.9+) for rabbitmq. My usage for rabbitmq and pika is as follows :
What can be a good heartbeat value for amazon ec2 to prevent it closing idle connections ?
Also, some suggest to use rabbitmq keepalive (or libkeepalive) to maintain tcp connections. I think managing heartbeats at the tcp layer is much better because the application need not manage them.Is this true ? Is keepalive a good method when compared to RMQ heartbeats ?
I have seen that some suggest using multiple threads and queue for long running tasks. But is this the only option for long running tasks ? It is quite disappointing that another queue must be used for this scenario.
Thank you in advance. I think I have detailed the problem. Let me know if I can provide more details.
Upvotes: 2
Views: 3480
Reputation: 140
If you're not tied to using pika, this thread helped me achieve what you're trying to do using kombu:
#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after
eventlet.monkey_patch()
log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)
def long_running_function(body):
time.sleep(300)
def job_worker(body, message):
long_running_function(body)
message.ack()
def monitor_heartbeats(connection, rate=2):
"""Function to send heartbeat checks to RabbitMQ. This keeps the
connection alive over long-running processes."""
if not connection.heartbeat:
logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
return
interval = connection.heartbeat
cref = weakref.ref(connection)
logger.info("Starting heartbeat monitor.")
def heartbeat_check():
conn = cref()
if conn is not None and conn.connected:
conn.heartbeat_check(rate=rate)
logger.info("Ran heartbeat check.")
spawn_after(interval, heartbeat_check)
return spawn_after(interval, heartbeat_check)
def main():
setup_logging(loglevel='INFO')
# process for heartbeat monitor
p = None
try:
with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
conn.ensure_connection()
monitor_heartbeats(conn)
queue = Queue('job_queue',
Exchange('job_queue', type='direct'),
routing_key='job_queue')
logger.info("Starting worker.")
with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
consumer.qos(prefetch_count=1)
for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
pass
except KeyboardInterrupt:
logger.info("Worker was shut down.")
if __name__ == "__main__":
main()
I stripped out my domain specific code but essentially this is the framework I use.
Upvotes: 4