Manuel
Manuel

Reputation: 802

Multithread celery worker for task division

Im currently building an application that, based on some input, runs some scans.

The issue i'm encountering is that a bottleneck is being creating on some of the scans, I was wondering if there was a way to implement a different thread/worker for these tasks.

I'll ellaborate a little bit more.

I start my worker with the command

pipenv run celery -A proj worker -B -l info

### Tasks.py ###

@shared_task
def short_task_1():
    return

@shared_task
def short_task_2():
    return

@shared_task
def long_task_1():
    return
### handler.py ###
def handle_scan():
    short_task_1.delay()
    short_task_2.delay()
    long_task_1.delay()

A possible solution I find is assigning the short tasks to one worker and the longer ones to the other. But I cant find in the docs how to define which worker the task is assigned to with the delay() command.

Will having another worker for handling this tasks help? If another thread is the solution, whats the best way of doing it?

Upvotes: 3

Views: 5152

Answers (1)

Manuel
Manuel

Reputation: 802

I ended up doing the following

delay() does not work if you are trying to use multiple task queues. Mainly because delay() is only used if the "default" queue is used. For using multiple queues, apply_async() must be used.

For example, if a task was called with .delay(arg1, arg2) Now (With multiple queues in mind) it needs to be called with .apply_async(args=[arg1,arg2], queue='queue_name')

So, here is how I did it finally, thanks to @DejanLekic tasks.py

@shared_task
def short_task_1():
    return

@shared_task
def short_task_2():
    return

@shared_task
def long_task_1():
    return

Same as before. But here is the new handler

def handle_scan():
    # Fast queue with args if required
    short_task_1.apply_async(args=[arg1, arg2], queue='fast_queue')
    short_task_2.apply_async(args=[arg1, arg2], queue='fast_queue')
    # slow queue
    long_task_1.apply_async(args=[arg1, arg2], queue='slow_queue')

I start the workers by doing the following (mind the pipenv):

pipenv run celery -A proj worker -B --loglevel=info -Q slow_queue,fast_queue

Upvotes: 2

Related Questions