Ryu_hayabusa
Ryu_hayabusa

Reputation: 3722

Celery task duplication issue

I have 3 celery beat instances running(using different settings.py) on my VPS. Three of those instances are used by three websites of same code. The task basically sends email to several hundred registered user( using sendgrid).

My problem is that my task run 3 times when run using ETA method as follows.

    sdate = datetime.datetime.strptime(request.POST['schedule_date'],'%d-%m-%Y %H:%M')
                tz=get_current_timezone()
    celery_scheduled_campaign.apply_async(eta=tz.localize(sdate),
                    kwargs={'schedule_id': schedule.id })

but runs as expected (only once) when using the .delay method.

celery_sendmail_task.delay(pro_campaign,unsubscribe_url,ecm_host)

settings_one.py

...
BROKER_URL = 'redis://localhost:6379/0'
...

settings_two.py

...
BROKER_URL = 'redis://localhost:6379/1'
...

settings_three.py

...
BROKER_URL = 'redis://localhost:6379/2'
...

task.py

from celery import task
from bulkmailer import send_email
from models import CampaignSchedule, SendgridEmailQuota
import logging
logger = logging.getLogger("ecm_console")
#import pdb
#import time
#from django.core.mail import EmailMultiAlternatives

@task.task(ignore_result=True)
def celery_sendmail_task(obj,unsubscribe_url,host):
    #time.sleep(10)
    send_email(obj,unsubscribe_url,host)
    obj.status=True
    if obj.campaign_opt=='S':
        obj.campaign_opt='R'
    obj.save()

@task.task(ignore_result=True)
def sendgrid_quota_reset():
    try:
        quota = SendgridEmailQuota.objects.get(pk=1)
        quota.used=0
        quota.save()
        logger.info("Success : sendgrid_quota_reset job ")
    except Exception, e:
        logger.error("Critical Error : sendgrid_quota_reset: {0} ".format(e))

@task.task(ignore_result=True)
def celery_scheduled_campaign(schedule_id):
    try:
        obj = CampaignSchedule.objects.get(pk=schedule_id)
        send_email(obj.campaign, obj.unsub_url, obj.ecm_host)
        obj.campaign.status = True
        obj.campaign.save()
    except Exception, e:
        logger.error("Critical Error : celery_scheduled_campaign: {0} ".format(e))

commands used to run celery

python manage.py celery worker -B -c 2 --loglevel=info --settings=ecm.settings_one

python manage.py celery worker -B -c 2 --loglevel=info --settings=ecm.settings_two

python manage.py celery worker -B -c 2 --loglevel=info --settings=ecm.settings_three

versions

celery==3.0.21 django-celery==3.0.21 Python 2.7.3

EDIT 1 Celery log shows tasks are automatically added few hours after

[2014-11-24 22:09:32,521: INFO/MainProcess] Celerybeat: Shutting down...
[2014-11-24 22:09:32,557: WARNING/MainProcess] Restoring 1 unacknowledged message(s).
[2014-11-24 22:09:40,495: INFO/Beat] Celerybeat: Starting...
[2014-11-24 22:09:40,540: WARNING/MainProcess] celery@mailer ready.
[2014-11-24 22:09:40,547: INFO/MainProcess] consumer: Connected to redis://localhost:6379/3.
[2014-11-24 22:09:40,614: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]

^^ This is where i added the task from front-end. Below tasks are getting automatically added

[2014-11-24 23:09:53,039: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]

A periodic task without ETA running normally VV

[2014-11-25 00:01:00,044: INFO/Beat] Scheduler: Sending due task ecm_sendgrid_sync (ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync)
[2014-11-25 00:01:00,052: INFO/MainProcess] Got task from broker: ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync[37c94a3a-f6c2-433c-81a3-ae351c7018f8]
[2014-11-25 00:01:02,262: INFO/MainProcess] Success : update job  
[2014-11-25 00:01:02,265: INFO/MainProcess] Task ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync[37c94a3a-f6c2-433c-81a3-ae351c7018f8] succeeded in 2.18759179115s: None

Again tasks with ETA getting automatically added. Notice that hash is same.

[2014-11-25 00:10:12,190: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 01:10:26,029: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 02:10:39,025: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 03:10:50,063: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 04:00:00,007: INFO/Beat] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
[2014-11-25 04:00:00,064: INFO/MainProcess] Got task from broker: celery.backend_cleanup[35a4db80-008e-49c9-9735-2dc1df5e0ecc] expires:[2014-11-25 16:00:00.008296+04:00]
[2014-11-25 04:00:01,533: INFO/MainProcess] Task celery.backend_cleanup[35a4db80-008e-49c9-9735-2dc1df5e0ecc] succeeded in 1.01458001137s: None
[2014-11-25 04:11:03,062: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 05:11:15,073: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 06:11:26,101: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 07:11:38,324: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 08:11:53,097: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]

This may be a bug in old version. I also suspect my VPS, it is low on memory (400+/489 used)

Upvotes: 3

Views: 4400

Answers (2)

Ryu_hayabusa
Ryu_hayabusa

Reputation: 3722

Finally did a fix. Added a lock mechanism to ensure task is only executed once.more details here .

task.py

# ...
import redis
@task.task(ignore_result=True)
def celery_scheduled_campaign(schedule_id):
    LOCK_EXPIRE = 60 * 30 # Lock expires in 30 minutes
    obj = campaign.objects.get(pk=schedule_id)
    my_lock = redis.Redis().lock(obj.campaign_uuid,timeout=LOCK_EXPIRE)
    if my_lock.acquire(blocking=False) and obj.is_complete == False:
        #...
        # Task to run
        #...
        obj.is_complete = True
        my_lock.release()

models.py

# ...
import uuid
class campaign(models.Model):
    # ...
    campaign_uuid  = models.CharField(editable=False, max_length=100)
    is_complete    = models.BooleanField(default=False)
    # ...
    def save(self, *args, **kwargs):
            if not self.id:
                self.campaign_uuid = str(uuid.uuid4())
            super(campaign, self).save(*args, **kwargs)

Upvotes: 5

doniyor
doniyor

Reputation: 37914

Make sure all 3 messages are not going to the same port which causes multiple celery instances on the same port.

Upvotes: 1

Related Questions