Reputation: 7364
But when I just left my computer open and run celery that runs a task every 50 seconds overnight and I saw some skips like for 1 hour. It's actually executing nice except for the unexpected skips. Why is this happening? How to resolve this?
Here's an example of the skipped log in my worker -l info
2016-11-03 10:13:36,264: INFO/MainProcess] Task core.tasks.sample[8efcedc5-1e08-41c4-80b9-1f82a9ddbaad] succeeded in 1.062010367s: None
[2016-11-03 11:14:19,751: INFO/MainProcess] Received task: core.tasks.sample[ca9d6ef4-2cdc-4546-a9fb-c413541a80ee]
Here's an example of the skipped log in my beat -l info
[2016-11-03 10:13:35,199: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample)
[2016-11-03 11:14:19,748: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample)
Here is my task code:
# 50 seconds
@periodic_task(run_every=timedelta(**settings.XXX_XML_PERIODIC_TASK))
def sample():
global GLOBAL_CURRENT_DATE
if cache.get('XXX_xml_today_saved_data') is None:
cache.set('XXX_xml_today_saved_data', [])
saved_data = cache.get('XXX_xml_today_saved_data')
ftp = FTP('xxxxx')
ftp.login(user='xxxxx', passwd='xxxxx')
ftp.cwd('XXX')
date_dir = GLOBAL_CURRENT_DATE.replace("-", "")
try:
ftp.cwd(date_dir)
except:
ftp.cwd(str(int(date_dir) - 1))
_str = StringIO()
files = ftp.nlst()
if (GLOBAL_CURRENT_DATE != datetime.now().strftime("%Y-%m-%d") and
files == saved_data):
GLOBAL_CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")
cache.delete('XXX_xml_today_saved_data')
return
print files
print "-----"
print saved_data
unsaved = list(set(files) - set(saved_data))
print "-----"
print unsaved
if unsaved:
file = min(unsaved)
# modified_time = ftp.sendcmd('MDTM '+ file)
print file
ftp.retrbinary('RETR ' + file, _str.write)
xml = '<root>'
xml += _str.getvalue()
xml += '</root>'
if cache.get('XXX_provider_id') is None:
cache.set('XXX_provider_id', Provider.objects.get(code="XXX").id)
_id = cache.get('XXX_provider_id')
_dict = xmltodict.parse(xml, process_namespaces=True,
dict_constructor=dict, attr_prefix="")
row = _dict['root']['row']
if type(_dict['root']['row']) == dict:
_dict['root']['row'] = []
_dict['root']['row'].append(row)
row = _dict['root']['row']
for x in row:
if cache.get('XXX_data_type_' + x['dataType']) is None:
obj, created = DataType.objects.get_or_create(code=x['dataType'])
obj, created = ProviderDataType.objects.get_or_create(provider_id=_id, data_type=obj)
if created:
cache.set('XXX_data_type_' + x['dataType'], obj.id)
_id = cache.get('XXX_data_type_' + x['dataType'])
obj, created = Transaction.objects.get_or_create(data=x, file_name=file,
provider_data_type_id=_id)
if created:
if x['dataType'] == "BR":
print "Transact"
br_transfer(**x)
else:
print "Not transacting"
saved_data.append(file)
cache.set('XXX_xml_today_saved_data', saved_data)
ftp.close()
Here is my CELERY CONFIGS in settings.py:
BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Nairobi'
XXX_XML_PERIODIC_TASK = {'seconds': 50}
CACHES = {
'default': {
'BACKEND': 'redis_cache.RedisCache',
'LOCATION': 'localhost:6379',
'TIMEOUT': None,
},
}
Any explanations or suggestions?
I am using python 2.7.10 and django 1.10
Upvotes: 0
Views: 1187
Reputation: 6973
There could be a couple of problems. The most likely is that your worker is busy when your task is triggered. You can prevent this by having more workers. The docs explain a --concurrency
option for a single worker, as well as an option for running multiple worker processes.
You could also run different workers attached to different projects so that certain tasks are assigned to certain projects. i.e., dedicated queues for certain tasks: Starting worker with dynamic routing_key?
What I've also seen is that a worker can prefetch tasks and hold onto them -- but if the task it is currently running runs past the countdown, your task may be delayed.
You'll want to read up on CELERYD_PREFETCH_MULTIPLIER
:
Upvotes: 1
Reputation: 871
Celery workers pop tasks from a queue when they're ready but if the task has a countdown it will pop other tasks in the mean time and wait for the time to expire by doing other things. It doesn't guarantee that tasks will be run at that time, just at least at that time or later.
Upvotes: 1