Reputation:
I'm trying to build a distributed job execution system with celery.
When I launch 2 workers on a single machine (localhost), in which one for an addition task add
and the other for a subtraction task sub
, then use add.delay()
to kick off sevral addition tasks, there's an error in the subtraction worker's terminal:
[2013-03-05 15:51:18,898: ERROR/MainProcess] Received unregistered task of type 'add_tasks.add'.
In this test, I kicked off 2 addition tasks: one is caught by the addition worker while the other caught by the subtraction worker, which caused the error above. How could I change the configurations so that the second addition task won't be caught by the subtraction worker? Thanks.
Here's the code:
add_tasks.py:
celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')
@celery.task
def add(x, y):
sleep(20)
return x + y
sub_tasks.py:
celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')
@celery.task
def sub(x, y):
sleep(10)
return x - y
I launched the workers by celery -A add_tasks worker --loglevel=info -n worker1
and celery -A sub_tasks worker --loglevel=info -n worker2
in two terminals of localhost machine.
Upvotes: 2
Views: 560
Reputation:
Finally I found the ROUTER
feature could solve my problem. I put my solution here and hope it would be useful to others who have the same problems.
When launching a worker, we could use -Q queue
option to limit the worker with only accepting tasks in queue
. In my situation, I used celery -A add_tasks worker --loglevel=info -n worker1 -Q addition
.
And on the other hand, when kicking off a new task, we should indicate explicitly with queue argument, for example add.apply_async(queue='addition',priority=0,args=[1,4])
and sub.apply_async(queue='subtraction',priority=0,args=[1,4])
. Then the addition task won't be accepted by the subtraction worker.
Upvotes: 4