Reputation: 11295
My django application currently takes in a file, and reads in the file line by line, for each line there's a celery task that delegates processing said line.
Here's kinda what it look slike
File -> For each line in file -> celery_task.delay(line)
Now then, I also have other celery tasks that can be triggered by the user for example:
User input line -> celery_task.delay(line)
This of course isn't strictly the same task, the user can in essence invoke any celery task depending on what they do (signals also invoke some tasks as well)
Now the problem that I'm facing is, when a user uploads a relatively large file, my redis queue gets boggled up with processing the file, when the user does anything, their task will be delegated and executed only after the file's celery_task.delay() tasks are done executing. My question is, is it possible to reserve a set amount of workers or delay a celery task with a "higher" priority and overwrite the queue?
Here's in general what the code looks like:
@app.task(name='process_line')
def process_line(line):
some_stuff_with_line(line)
do_heavy_logic_stuff_with_line(line)
more_stuff_here(line)
obj = Data.objects.make_data_from_line(line)
serialize_object.delay(obj.id)
return obj.id
@app.task(name='serialize_object')
def serialize_object(important_id):
obj = Data.objects.get(id=important_id)
obj.pre_serialized_json = DataSerializer(obj).data
obj.save()
@app.task(name='process_file')
def process_file(file_id):
ingested_file = IngestedFile.objects.get(id=file_id)
for line in ingested_file.get_all_lines():
process_line.delay(line)
Upvotes: 0
Views: 1086
Reputation: 39659
Yes you can create multiple queues, and then you can decide to route your tasks to those queues running on multiple workers or single worker. By default all tasks go to default queue named as celery
. Check Celery documentation on Routing Tasks to get more information and some examples.
Upvotes: 1