Reputation: 802
Im currently building an application that, based on some input, runs some scans.
The issue i'm encountering is that a bottleneck is being creating on some of the scans, I was wondering if there was a way to implement a different thread/worker for these tasks.
I'll ellaborate a little bit more.
I start my worker with the command
pipenv run celery -A proj worker -B -l info
### Tasks.py ###
@shared_task
def short_task_1():
return
@shared_task
def short_task_2():
return
@shared_task
def long_task_1():
return
### handler.py ###
def handle_scan():
short_task_1.delay()
short_task_2.delay()
long_task_1.delay()
A possible solution I find is assigning the short tasks to one worker and the longer ones to the other. But I cant find in the docs how to define which worker the task is assigned to with the delay()
command.
Will having another worker for handling this tasks help? If another thread is the solution, whats the best way of doing it?
Upvotes: 3
Views: 5152
Reputation: 802
I ended up doing the following
delay()
does not work if you are trying to use multiple task queues. Mainly because delay()
is only used if the "default" queue is used. For using multiple queues, apply_async()
must be used.
For example, if a task was called with .delay(arg1, arg2)
Now (With multiple queues in mind) it needs to be called with .apply_async(args=[arg1,arg2], queue='queue_name')
So, here is how I did it finally, thanks to @DejanLekic
tasks.py
@shared_task
def short_task_1():
return
@shared_task
def short_task_2():
return
@shared_task
def long_task_1():
return
Same as before. But here is the new handler
def handle_scan():
# Fast queue with args if required
short_task_1.apply_async(args=[arg1, arg2], queue='fast_queue')
short_task_2.apply_async(args=[arg1, arg2], queue='fast_queue')
# slow queue
long_task_1.apply_async(args=[arg1, arg2], queue='slow_queue')
I start the workers by doing the following (mind the pipenv):
pipenv run celery -A proj worker -B --loglevel=info -Q slow_queue,fast_queue
Upvotes: 2