Reputation: 36473
I have a logger in Celery (with RabbitMQ) and want to duplicate its work for emergency cases.
# tasks.py
@task
def log(message):
with open('test.txt', 'a') as f:
f.write(message)
# views.py
log.delay(message)
How can I make 2 instances of Celery on different machines run log()
when it's called?
Does it make sense at all to do this?
This is possible in RabbitMQ. If you have a topic-based exchange, it's clear that one message can be put into two different queues and delivered independently to 2 receivers.
sender =>
[message, routing_key=event.logging.log] => [queue A, topic=event.#]
=> receiver 1
=> [queue B, topic=*.logging.*]
=> receiver 2
The message will be sent to both queues, and none of them will steal the message from another one.
Upvotes: 1
Views: 479
Reputation: 19499
To do this you would have to configure the exchange to be a topic exchange (as you say):
CELERY_QUEUES = {
'celery': {
'exchange': 'celerytopic',
'exchange_type': 'topic',
'routing_key': 'celery',
},
}
Then you can create your backup exchange using the AMQP api:
from celery import current_app as celery
with celery.broker_connection() as conn:
conn.default_channel.queue_declare(queue='celery.backup', durable=True)
conn.default_channel.queue_bind(queue='celery.backup',
exchange='celerytopic',
routing_key='celery',
durable=True)
Since you already have a queue named celery you may have to delete that first:
$ camqadm queue.delete celery
Upvotes: 1
Reputation: 8482
It doesn't make sense for me to try to start this task on two different machines. At least Celery cannot guarantee that a task will be run on different machines - it is RabbitMQ that distributes load, and if one node is less loaded than other - the two tasks run will be probably executed on that machine...
Use task.retry
instead. Celery will retry a task, if it fails to execute. Celery is smart enough to understand if a task had failed. Just make sure to raise some exception if the tasks fail, and not to return silently if it cannot successfully log.
UPDATE:
A possible workflow could be - try to execute the task, if it fails, in on_retry change the routing_key, and try to execute task in a different exchange/queue which can be your fail-over queue.
Upvotes: 1