Reputation: 1087
I am working on a project using Celery for distributing tasks. In order to route a task to a specific worker (because it needs specific files, created by a previous task), I am trying to use celery.utils.worker_direct.
What I'm doing is basically this:
@app.task(bind=True)
def task_A(self, arg):
worker = str(self.request.hostname)
# ...
s = task_B.s(arg1, worker)
s.delay()
@app.task
def task_B(arg1, worker):
task_C.apply_async((arg1, arg2), queue=worker_direct(worker))
@app.task
def task_C(arg1, arg2):
pass
When task_C.apply_async((arg1, arg2), queue=worker_direct(worker))
is executed, I get this error:
TypeError: object of type 'Queue' has no len()
What am I doing wrong?
Upvotes: 0
Views: 402
Reputation: 1087
Found a solution:
# task_A
worker = worker_direct(self.request.hostname).name
# task_B
task_C.apply_async((arg1, arg2), queue=worker)
Upvotes: 2