mpioski
mpioski

Reputation: 48

How do I get a Celery worker to consume an 'outside' RabbitMQ queue?

I have the following scripts:

celery_tasks.py

from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'

@app.task(acks_late=True)
def test(a):
   return a

publish.py

from celery_tasks import test
test.delay('abc')

When i run publish.py and start the worker (celery -A celery_tasks worker --loglevel=DEBUG), the 'abc' content is published in the 'test_queue' and is consumed by the worker.

Is there a way for the worker to consume something from a queue that was not posted by Celery? For example, when I put something in the test_queue straight through RabbitMQ, without going through the Celery publisher, and run the Celery worker, it gave me the following warning:

WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

The full contents of the message body was: body: 'abc' (3b)

{content_type:None content_encoding:None delivery_info:{'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue'} headers={}}

Is there a way to solve this?

Upvotes: 2

Views: 3001

Answers (2)

rkirmizi
rkirmizi

Reputation: 364

It's a late answer but custom consumers might help you. I'm using this for consuming messages from rabbitmq. Where these messages are being populated from another app with pika.

http://docs.celeryproject.org/en/latest/userguide/extending.html#custom-message-consumers

Upvotes: 4

shipperizer
shipperizer

Reputation: 1669

Celery has a specific format and a set of headers that needs to be maintained to comply with it. Therefore you would have to reverse engineer it to make celery-compliant message not produced by celery. Keep in mind that celery is not really made to send messages across the broker, but to send tasks, which are enhanced messages therefore have extras in the header part of the amqp message

Upvotes: 4

Related Questions