Davide R.
Davide R.

Reputation: 900

Increment a counter and trigger an action when a threshold is exceeded

I have a model like this

class Thingy(models.Model):
    # ...
    failures_count = models.IntegerField()

I have concurrent processes (Celery tasks) that need to do this:

  1. do some kind of processing
  2. if the processing fails increment failures_counter of the respective Thingy
  3. if failures_counter exceeds a threshold for some Thingy, issue a warning, but ONLY ONE warning.

I have some ideas about how to do this without a race condition, for example using explicit locks (via select_for_update):

@transaction.commit_on_success
def report_failure(thingy_id):
    current, = (Thingy.objects
               .select_for_update()
               .filter(id=thingy_id)
               .values_list('failures_count'))[0]
    if current == THRESHOLD:
        issue_warning_for(thingy_id)
    Thingy.objects.filter(id=thingy_id).update(
        failures_count=F('failures_count') + 1
    )

Or by using Redis (it's already there) for synchronization:

@transaction.commit_on_success
def report_failure(thingy_id):
    Thingy.objects.filter(id=thingy_id).update(
        failures_count=F('failures_count') + 1
    )
    value = Thingy.objects.get(id=thingy_id).only('failures_count').failures_count
    if value >= THRESHOLD:
        if redis.incr('issued_warning_%s' % thingy_id) == 1:
            issue_warning_for(thingy_id)

Both solutions use locks. As I'm using PostgreSQL, is there a way to achieve this without locking?


I'm editing the question to include the answer (thanks to Sean Vieira, see answer below). The question asked about a way to avoid locking and this answer is optimal in that it leverages multi-version concurrency control (MVCC) as implemented by PostgreSQL.

This specific question explicitly allowed using PostgreSQL features, and though many RDBMSs implement UPDATE ... RETURNING, it is not standard SQL and is not supported by Django's ORM out of the box, so it requires using raw SQL via raw(). The same SQL statement will work in other RDBMSs but every engine requires its own discussion regarding synchronization, transactions isolation and concurrency models (e.g. MySQL with MyISAM would still use locks).

def report_failure(thingy_id):
    with transaction.commit_on_success():
        failure_count = Thingy.objects.raw("""
            UPDATE Thingy
            SET failure_count = failure_count + 1
            WHERE id = %s
            RETURNING failure_count;
        """, [thingy_id])[0].failure_count

    if failure_count == THRESHOLD:
        issue_warning_for(thingy_id)

Upvotes: 3

Views: 1225

Answers (2)

Sean Vieira
Sean Vieira

Reputation: 159905

As far as I can tell Django's ORM doesn't support this out of the box - however, that doesn't mean it can't be done, you just need to dip down to the SQL level (exposed, in Django's ORM via a Manager's raw method) to make it work.

If you are using PostgresSQL >= 8.2 then you can use RETURNING to get the final value for failure_count without any additional locking (the DB will still lock, but only long enough to set the value, no additional time lost communicating with you):

# ASSUMPTIONS: All IDs are valid and IDs are unique
# More defenses are necessary if either of these assumptions
# are not true.
failure_count = Thingy.objects.raw("""
    UPDATE Thingy
    SET failure_count = failure_count + 1
    WHERE id = %s
    RETURNING failure_count;
""", [thingy_id])[0].failure_count

if failure_count == THRESHOLD:
    issue_warning_for(thingy_id)

Upvotes: 5

vutran
vutran

Reputation: 2175

I don't really know the reason that you have to do this job without locking, how many task do you have running concurrently?

However, I think there is one way to do this without locking like this:

You should have another model, for example Failure:

class Failure(models.Model):
    thingy = models.ForeignKey(Thingy)

Your *report_failure* should be like this:

from django.db import transaction
@transaction.commit_manually
def flush_transaction():
    transaction.commit()

@transaction.commit_on_success
def report_failure(thingy_id):
    thingy = Thingy.objects.get(id=thingy_id)
    #uncomment following line if you found that the query is cached (not get updated result)
    #flush_transaction()

    current = thingy.failure_set.count()
    if current >= THRESHOLD:
        issue_warning_for(thingy_id)
    Failure.objects.create(thingy=thingy)

I know this approach is quite bad because it creates a lot of Failure record. But this is the only idea i can figure out. Sorry about that.

Upvotes: 0

Related Questions