Dean Christian Armada
Dean Christian Armada

Reputation: 7364

Celery skips task by an hour

But when I just left my computer open and run celery that runs a task every 50 seconds overnight and I saw some skips like for 1 hour. It's actually executing nice except for the unexpected skips. Why is this happening? How to resolve this?

Here's an example of the skipped log in my worker -l info

2016-11-03 10:13:36,264: INFO/MainProcess] Task core.tasks.sample[8efcedc5-1e08-41c4-80b9-1f82a9ddbaad] succeeded in 1.062010367s: None
[2016-11-03 11:14:19,751: INFO/MainProcess] Received task: core.tasks.sample[ca9d6ef4-2cdc-4546-a9fb-c413541a80ee]

Here's an example of the skipped log in my beat -l info

[2016-11-03 10:13:35,199: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample)
[2016-11-03 11:14:19,748: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample)

Here is my task code:

# 50 seconds
@periodic_task(run_every=timedelta(**settings.XXX_XML_PERIODIC_TASK))
def sample():
    global GLOBAL_CURRENT_DATE
    if cache.get('XXX_xml_today_saved_data') is None:
        cache.set('XXX_xml_today_saved_data', [])
    saved_data = cache.get('XXX_xml_today_saved_data')
    ftp = FTP('xxxxx')
    ftp.login(user='xxxxx', passwd='xxxxx')
    ftp.cwd('XXX')
    date_dir = GLOBAL_CURRENT_DATE.replace("-", "")
    try:
        ftp.cwd(date_dir)
    except:
        ftp.cwd(str(int(date_dir) - 1))
    _str = StringIO()
    files = ftp.nlst()
    if (GLOBAL_CURRENT_DATE != datetime.now().strftime("%Y-%m-%d") and
            files == saved_data):
        GLOBAL_CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")
        cache.delete('XXX_xml_today_saved_data')
        return
    print files
    print "-----"
    print saved_data
    unsaved = list(set(files) - set(saved_data))
    print "-----"
    print unsaved
    if unsaved:
        file = min(unsaved)
        # modified_time = ftp.sendcmd('MDTM '+ file)
        print file
        ftp.retrbinary('RETR ' + file, _str.write)
        xml = '<root>'
        xml += _str.getvalue()
        xml += '</root>'
        if cache.get('XXX_provider_id') is None:
            cache.set('XXX_provider_id', Provider.objects.get(code="XXX").id)
        _id = cache.get('XXX_provider_id')
        _dict = xmltodict.parse(xml, process_namespaces=True,
                                dict_constructor=dict, attr_prefix="")
        row = _dict['root']['row']
        if type(_dict['root']['row']) == dict:
            _dict['root']['row'] = []
            _dict['root']['row'].append(row)
            row = _dict['root']['row']
        for x in row:
            if cache.get('XXX_data_type_' + x['dataType']) is None:
                obj, created = DataType.objects.get_or_create(code=x['dataType'])
                obj, created = ProviderDataType.objects.get_or_create(provider_id=_id, data_type=obj)
                if created:
                    cache.set('XXX_data_type_' + x['dataType'], obj.id)
            _id = cache.get('XXX_data_type_' + x['dataType'])
            obj, created = Transaction.objects.get_or_create(data=x, file_name=file,
                                       provider_data_type_id=_id)
            if created:
                if x['dataType'] == "BR":
                    print "Transact"
                    br_transfer(**x)
            else:
                print "Not transacting"

        saved_data.append(file)
        cache.set('XXX_xml_today_saved_data', saved_data)
    ftp.close()

Here is my CELERY CONFIGS in settings.py:

BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'
XXX_XML_PERIODIC_TASK = {'seconds': 50}

CACHES = {
    'default': {
        'BACKEND': 'redis_cache.RedisCache',
        'LOCATION': 'localhost:6379',
        'TIMEOUT': None,
    },
}

Any explanations or suggestions?

I am using python 2.7.10 and django 1.10

Upvotes: 0

Views: 1187

Answers (2)

rrauenza
rrauenza

Reputation: 6973

There could be a couple of problems. The most likely is that your worker is busy when your task is triggered. You can prevent this by having more workers. The docs explain a --concurrency option for a single worker, as well as an option for running multiple worker processes.

You could also run different workers attached to different projects so that certain tasks are assigned to certain projects. i.e., dedicated queues for certain tasks: Starting worker with dynamic routing_key?

What I've also seen is that a worker can prefetch tasks and hold onto them -- but if the task it is currently running runs past the countdown, your task may be delayed.

You'll want to read up on CELERYD_PREFETCH_MULTIPLIER:

Upvotes: 1

theWanderer4865
theWanderer4865

Reputation: 871

Celery workers pop tasks from a queue when they're ready but if the task has a countdown it will pop other tasks in the mean time and wait for the time to expire by doing other things. It doesn't guarantee that tasks will be run at that time, just at least at that time or later.

Upvotes: 1

Related Questions