Stupid.Fat.Cat
Stupid.Fat.Cat

Reputation: 11295

Can you reserve a set amount of celery workers for specific tasks or set a task to higher priority when you delay it?

My django application currently takes in a file, and reads in the file line by line, for each line there's a celery task that delegates processing said line.

Here's kinda what it look slike

File -> For each line in file -> celery_task.delay(line)

Now then, I also have other celery tasks that can be triggered by the user for example:

User input line -> celery_task.delay(line)

This of course isn't strictly the same task, the user can in essence invoke any celery task depending on what they do (signals also invoke some tasks as well)

Now the problem that I'm facing is, when a user uploads a relatively large file, my redis queue gets boggled up with processing the file, when the user does anything, their task will be delegated and executed only after the file's celery_task.delay() tasks are done executing. My question is, is it possible to reserve a set amount of workers or delay a celery task with a "higher" priority and overwrite the queue?

Here's in general what the code looks like:

@app.task(name='process_line')
def process_line(line):
    some_stuff_with_line(line)
    do_heavy_logic_stuff_with_line(line)
    more_stuff_here(line)
    obj = Data.objects.make_data_from_line(line)
    serialize_object.delay(obj.id)
    return obj.id

@app.task(name='serialize_object')
def serialize_object(important_id):
    obj = Data.objects.get(id=important_id)
    obj.pre_serialized_json = DataSerializer(obj).data
    obj.save()

@app.task(name='process_file')
def process_file(file_id):
    ingested_file = IngestedFile.objects.get(id=file_id)
    for line in ingested_file.get_all_lines():
        process_line.delay(line)

Upvotes: 0

Views: 1086

Answers (1)

Aamir Rind
Aamir Rind

Reputation: 39659

Yes you can create multiple queues, and then you can decide to route your tasks to those queues running on multiple workers or single worker. By default all tasks go to default queue named as celery. Check Celery documentation on Routing Tasks to get more information and some examples.

Upvotes: 1

Related Questions