user43995
user43995

Reputation: 597

Python RabbitMQ - consumer only seeing every second message

I'm testing out a producer consumer example of RabbitMQ using Pika 0.98. My producer runs on my local PC, and the consumer runs on an EC2 instance at Amazon.

My producer sits in a loop and sends up some system properties every second. The problem is that I am only seeing the consumer read every 2nd message, it's as though every 2nd message is not being read. For example, my producer prints out this (timestamp, cpu pct used, RAM used):


    2014-08-16 14:36:17.576000 -0700,16.0,8050806784
    2014-08-16 14:36:18.578000 -0700,15.5,8064458752
    2014-08-16 14:36:19.579000 -0700,15.0,8075313152
    2014-08-16 14:36:20.580000 -0700,12.1,8074121216
    2014-08-16 14:36:21.581000 -0700,16.0,8077778944
    2014-08-16 14:36:22.582000 -0700,14.2,8075038720

but my consumer is printing out this:


    Received '2014-08-16 14:36:17.576000 -0700,16.0,8050806784'
    Received '2014-08-16 14:36:19.579000 -0700,15.0,8075313152'
    Received '2014-08-16 14:36:21.581000 -0700,16.0,8077778944'

The code for the producer is:



    import pika
    import psutil
    import time
    import datetime
    from dateutil.tz import tzlocal
    import logging
    logging.getLogger('pika').setLevel(logging.DEBUG)

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='54.191.161.213'))
    channel = connection.channel()

    channel.queue_declare(queue='ems.data')

    while True:
        now = datetime.datetime.now(tzlocal())
        timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
        msg="%s,%.1f,%d" % (timestamp, psutil.cpu_percent(),psutil.virtual_memory().used)
        channel.basic_publish(exchange='', 
                          routing_key='ems.data',
                          body=msg)
        print msg
        time.sleep(1)
    connection.close()

And the code for the consumer is:



    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='0.0.0.0'))
    channel = connection.channel()


    channel.queue_declare(queue='hello')

    print ' [*] Waiting for messages. To exit press CTRL+C'

    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)

    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)

    channel.start_consuming()

Upvotes: 3

Views: 2457

Answers (1)

dano
dano

Reputation: 94871

Your code is fine logically, and runs without issue on my machine. The behavior you're seeing suggests that you may have accidentally started two consumers, with each one grabbing a message off the queue, round-robin style. Try either killing the extra consumer (if you can find it), or rebooting.

Upvotes: 10

Related Questions