Onilol
Onilol

Reputation: 1339

Avoid concurrent access to same queue element

I'm reviewing/refactoring a queue that's going to be used internally by up to 20 people simultaneously but as of now, multiple people can access the first element ( We tried it locally clicking on the link at the same time. )

The flux is similar to this:

views.py

[GET]
def list_operator(request, id):
  if request.POST:
    utilitary = Utilitary(id)
    pool = ThreadPool(processes=1)
    async_result = pool.apply_async(utilitary.recover_people, (id, ))
    return_val = async_result.get()
    person = People.objects.get(pk=return_val)
    return redirect('people:people_update', person.pk)

utilitary.py

This file has the method recover_people which performs around 4-5 queries (where people have flag_allocated=False) across multiple tables and sorts a list, to return the first element. The final step is this one:

for person in people:
  p_address = People_Address.objects.get(person_id=person.id)
  p_schedule = Schedules.objects.get(schedules_state=p_address.p_state)

  if datetime.now() > parse_time(p_schedule.schedules_hour):
    person = People.objects.get(pk=person.id)
    person.flag_allocated = True
    person.date_of_allocation = datetime.now()
    person.save()
    return person.pk

Perhaps something in the Utilitary method's logic is wrong? Or I should be expecting this problem with this amount of people simultaneously calling this method?

Could using a cache help? I'm sorry, I'm new to django and mvc.

Upvotes: 3

Views: 735

Answers (3)

Gabriel Menezes
Gabriel Menezes

Reputation: 119

In this case, it seems like a lot to select the "person" in the database again just to lock it in memory and then write it to the database. This action is not atomic.

You can have the record locked by other process between these actions:

person = People.objects.get(pk=person.id)
person.flag_allocated = True
person.date_of_allocation = datetime.now()
person.save()

That is where your problem is. But... If you directly update the record on the database passing a condition on which the update will only write on a record where the flag_allocated=False, you just have to see if your update affected any row or not. If not, you go to the next person on the queue.

Something like:

for person in people:
    rows = People.objects.filter(pk=person.id, flag_allocated=False).update(flag_allocated=True)
    if rows:
        break # Got the person... And nobody else will.

The update will have the record locked to write it to the allocation_flag (SQL principle). If two updates try to mess with the same row, one will do it first and then the second won't update anything and will try the next person.

Upvotes: 2

bruno desthuilliers
bruno desthuilliers

Reputation: 77942

The canonical solution here would be to use a lock of some sort so you cannot have two concurrent executions of utilitary.recover_people whatsoever - the function waits until it acquire the lock, executes, and releases the lock.

Given that Django is typically served by multiple processes (and you certainly don't want to change this), and that you don't want a screwed up call to keep the lock forever, a good solution here is to use something like redis to store the lock (all django processes sharing the same redis db of course), with an expiration set to a reasonable time so it won't remain set forever.

There are examples of such setups using celery (not that you necessarily needs celery here, it's just that the principle is the same since it's a common use case when using celery to avoid concurrent tasks stepping on each other).

You could also just use your SQL database to store the lock but then you don't have automatic expiration...

Upvotes: 2

erik258
erik258

Reputation: 16302

If you're not doing anything to prevent interference between concurrent processes, they're bound to step on each others' toes sooner or later.

A time honored approach to modeling a queue in a database is to register a worker to a particular job in a transactionally consistent manner before executing the job.

Say you have a table work with columns for a job id or specification, an initially null status, and an initially null value for worker. The workers can "register" for a job by running an update such as

 Update `work` set worker = my_worker_id, status=initializing where status is null and worker is null limit 1.

Only one worker can " register" the next job due to the " where" clause.

This isn't perfect - you still have to handle jobs that were orphaned by a failed worker. The status columns, updated on job completion, combined with a heart beat of some kind for the workers, and careful design around job idempotency, would give you the primitives to ensure jobs didn't get stuck on a failed or AWOL worker.

Upvotes: 2

Related Questions