cwick
cwick

Reputation: 26632

How to tell if a task has already been queued in django-celery?

Here's my setup:

In my settings.py file I have

BROKER_BACKEND = "djkombu.transport.DatabaseTransport"

i.e. I'm just using the database to queue tasks.

Now on to my problem: I have a user-initiated task that could take a few minutes to complete. I want the task to only run once per user, and I will cache the results of the task in a temporary file so if the user initiates the task again I just return the cached file. I have code that looks like this in my view function:

task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)

if result.state == celery.states.PENDING:
    # The next line makes a duplicate task if the user rapidly refreshes the page
    tasks.some_long_task.apply_async(task_id=task_id)
    return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
    return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
    if cached_file_still_exists():
        return get_cached_file()
    else:
        result.forget()
        tasks.some_long_task.apply_async(task_id=task_id)
        return HttpResponse("Task started...")

This code almost works. But I'm running into a problem when the user rapidly reloads the page. There's a 1-3 second delay between when the task is queued and when the task is finally pulled off the queue and given to a worker. During this time, the task's state remains PENDING which causes the view logic to kick off a duplicate task.

What I need is some way to tell if the task has already been submitted to the queue so I don't end up submitting it twice. Is there a standard way of doing this in celery?

Upvotes: 20

Views: 8960

Answers (3)

user67416
user67416

Reputation:

I don't think (as Tomek and other have suggested) that using the database is the way to do this locking. django has built-in cache framework, which should be sufficient to accomplish this locking, and must faster. See:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Django can be configured to use memcached as its cache backend, and this can be distributed across multiple machines ... this seems better to me. Thoughts?

Upvotes: 5

user250145
user250145

Reputation:

I solved this with Redis. Just set a key in redis for each task and then remove the key from redis in task's after_return method. Redis is lightweight and fast.

Upvotes: 5

Tomek Kopczuk
Tomek Kopczuk

Reputation: 2113

You can cheat a bit by storing the result manually in the database. Let me explain how this will help.

For example, if using RDBMS (table with columns - task_id, state, result):

View part:

  1. Use transaction management.
  2. Use SELECT FOR UPDATE to get row where task_id == "long-task-%d" % user_id. SELECT FOR UPDATE will block other requests until this one COMMITs or ROLLBACKs.
  3. If it doesn't exist - set state to PENDING and start the 'some_long_task', end the request.
  4. If the state is PENDING - inform the user.
  5. If the state is SUCCESS - set state to PENDING, start the task, return the file pointed to by 'result' column. I base this on the assumption, that you want to re-run the task on getting the result. COMMIT
  6. If the state is ERROR - set state to PENDING, start the task, inform the user. COMMIT

Task part:

  1. Prepare the file, wrap in try, catch block.
  2. On success - UPDATE the proper row with state = SUCCESS, result.
  3. On failure - UPDATE the proper row with state = ERROR.

Upvotes: 1

Related Questions