Reputation: 1930
I use a Celery task in my Django project with a lock as described in this article. It works great, but my task creates an object and I don't want that the lock was released before the object is committed in the database. How can I change this context manager to wait until the objects in the task are committed?
@contextmanager
def lock(lock_id, oid, expire=600):
timeout_at = monotonic() + expire - 3
status = cache.add(lock_id, oid, expire)
try:
yield status
finally:
if monotonic() < timeout_at:
cache.delete(lock_id)
@celery.task(bind=True, ignore_result=True)
def my_task(self, object_id):
with lock('my_task.{}'.format(object_id), self.app.oid) as acquired, transaction.atomic():
if not acquired:
self.retry(countdown=1)
def on_commit():
# release the lock only in this moment
pass
transaction.on_commit(on_commit)
MyModel.objects.create(object_id=object_id)
Upvotes: 1
Views: 1726
Reputation: 1930
This context manager creates a lock and wraps a body in a transaction. It releases the lock only when the transaction was committed or an exception (except celery.exceptions.Retry
) was raised.
As noted in Celery docs:
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
from celery.exceptions import Retry
from contextlib import contextmanager
from time import monotonic
from django.core.cache import cache
from django.db import transaction
@contextmanager
def lock_transaction(lock_id, oid, expire=600):
status = cache.add(lock_id, oid, expire)
timeout_at = monotonic() + expire - 3
is_retry = False
def on_commit():
if not is_retry and monotonic() < timeout_at:
cache.delete(lock_id)
with transaction.atomic():
transaction.on_commit(on_commit)
try:
yield status
except Retry as e:
is_retry = True
except:
if monotonic() < timeout_at:
cache.delete(lock_id)
raise
An example of using:
@celery.task(bind=True, ignore_result=True, max_retries=90, time_limit=60)
def create_or_add_counter_task(self, object_id):
with lock_transaction('object_id.{}'.format(object_id), self.app.oid) as acquired:
if not acquired:
self.retry(countdown=1)
try:
obj = MyModel.objects.get(object_id=object_id)
obj.counter += 1
obj.save()
except MyModel.DoesNotExist:
MyModel.objects.create(object_id=object_id)
Upvotes: 1