Reputation: 380
How do I make Celery send a task to the right worker when using send_task?
For instance, given the following services:
service_add.py
from celery import Celery
celery = Celery('service_add', backend='redis://localhost', broker='pyamqp://')
@celery.task
def add(x, y):
return x + y
service_sub.py
from celery import Celery
celery = Celery('service_sub', backend='redis://localhost', broker='pyamqp://') #redis backend,rabbitmq for messaging
@celery.task
def sub(x, y):
return x - y
the following code is guaranteed to fail:
main.py
from celery.execute import send_task
result1 = send_task('service_sub.sub',(1,1)).get()
result2 = send_task('service_sub.sub',(1,1)).get()
With the exception celery.exceptions.NotRegistered: 'service_sub.sub' because Celery sends each process the tasks in a round-robin fashion, even though service_sub belongs to just one process.
For the question to be complete, here's how I ran the processes and the config file:
celery -A service_sub worker --loglevel=INFO --pool=solo -n worker1
celery -A service_add worker --loglevel=INFO --pool=solo -n worker2
celeryconfig.py
## Broker settings.
broker_url = 'pyamqp://'
# List of modules to import when the Celery worker starts.
imports = ('service_add.py','service_sub.py')
Upvotes: 0
Views: 749
Reputation: 11347
If you're using two different apps service_add
/ service_sub
only to achieve the routing of tasks to a dedicated worker, I would like to suggest another solution. If that's not the case and you still need two (or more apps) I would suggest better encapsulate the broker like amqp://localhost:5672/add_vhost
& backend: redis://localhost/1
. Having a dedicate vhost
in rabbitMQ and a dedicated database id (1) in Redis might do the trick.
Having said that, I think that the right solution in such a case is using the same celery application (not splitting into two application) and use router:
task_routes = {'tasks.service_add': {'queue': 'add'}, 'tasks.service_sub': {'queue': 'sub'}}
add it to the configuration:
app.conf.task_routes = task_routes
and run your worker with Q
(which queue to read messages from):
celery -A shared_app worker --loglevel=INFO --pool=solo -n worker1 -Q add
celery -A shared_app worker --loglevel=INFO --pool=solo -n worker2 -Q sub
Note that this approach has more benefits, like if you want to have some dependencies between tasks (canvas).
There are more ways to define routers, you can read more about it here.
Upvotes: 1