allan.simon
allan.simon

Reputation: 4316

Celery: it ignores the exchange name when sending a task

I have a very simple code:

celery = Celery(broker="amqp://guest:[email protected]:5672/")                                                                                                                                                                                                           
celery.send_task(                                                                                                                                                                                                                            
    "robot.worker",                                                                                                                                                                                                                
    kwargs={},                                                                                                                                                                                                                               
    exchange="I_am_useless", # with  exchange=Exchange("I_am_useless") I got the same results                                                                                                                                                                                                          
)                                                                                                                                                                                                                                            

I really need the task to be sent to the exchange "I_am_useless", however it is not sent there, when I debug at the AMQP protocol level I see that a publish event is sent (which is ok) but with the following characteristic

reserved-1': 0,
'exchange-name': '',
'routing-key': 'celery',
'mandatory': False,
'immediate': False

so it seems the parameter is totally ignored, as even during the exchange declaration event, the name "celery" is used. but according to the documentation send_task takes the same parameters as https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async , and we can see there's a "exchange" parameter that should accept a string.

however if I call kombu directly

rabbit_url = "amqp://guest:[email protected]:5672/"                                                                                                                                                                                           
conn = Connection(rabbit_url)                                                                                                                                                                                                               
channel = conn.channel()                                                                                                                                                                                                                    
exchange = Exchange("example-exchange", type="direct")                                                                                                                                                                                      
producer = Producer(exchange=exchange, channel=channel, routing_key="BOB")                                                                                                                                                                  
queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")                                                                                                                                                                   
queue.maybe_bind(conn)                                                                                                                                                                                                                      
queue.declare()                                                                                                                                                                                                                             
producer.publish("Hello there!")                                                                                                                                                                                                            

I correctly see the exchange name, so I'm wondering what I'm doing wrong ?

Upvotes: 6

Views: 2034

Answers (1)

Tomáš Linhart
Tomáš Linhart

Reputation: 10220

After looking into the code and a bit of experimenting, it seems that you need to also specify exchange_type and routing_key arguments when calling send_task.

Upvotes: 4

Related Questions