Julio
Julio

Reputation: 2523

Celery, periodic task execution, with concurrency

I would like to launch a periodic task every second but only if the previous task ended (db polling to send task to celery). In the Celery documentation they are using the Django cache to make a lock.

I tried to use the example:

from __future__ import absolute_import

import datetime
import time

from celery import shared_task

from django.core.cache import cache
LOCK_EXPIRE = 60 * 5

@shared_task
def periodic():

    acquire_lock = lambda: cache.add('lock_id', 'true', LOCK_EXPIRE)
    release_lock = lambda: cache.delete('lock_id')

    a = acquire_lock()
    if a:
        try:
            time.sleep(10)
            print a, 'Hello ', datetime.datetime.now()
        finally:
            release_lock()
    else:
        print 'Ignore'

with the following configuration:

app.conf.update(
    CELERY_IGNORE_RESULT=True,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERYBEAT_SCHEDULE={
        'periodic_task': {
            'task': 'app_task_management.tasks.periodic',
            'schedule': timedelta(seconds=1),
        },
    },
)

But in the console, I never see the Ignore message and I have Hello every second. It seems that the lock is not working fine.

I launch the periodic task with:

celeryd -B -A my_app

and the worker with:

celery worker -A my_app -l info

Could you please correct my misunderstanding?

Upvotes: 0

Views: 1090

Answers (1)

fixmycode
fixmycode

Reputation: 8506

From the Django Cache Framework documentation about local-memory cache:

Note that each process will have its own private cache instance, which means no cross-process caching is possible.

So basically your workers are each dealing with their own cache. If you need a low resource cost cache backend I would recommend File Based Cache or Database Cache, both allow cross-process.

Upvotes: 2

Related Questions