felix001
felix001

Reputation: 16691

Celery / Django Single Tasks being run multiple times

I'm facing an issue where I'm placing a task into the queue and it is being run multiple times. From the celery logs I can see that the same worker is running the task ...

[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success

However when I view the actual logs of the application it is showing 5-6 log lines for each step (??).

Im using Django 1.6 with RabbitMQ. The method for placing into the queue is via placing a delay on a function.

This function (task decorator is added( then calls a class which is run.

Has anyone any idea on the best way to troubleshoot this ?

Edit : As requested heres the code,

views.py

In my view im sending my data to the queue via ...

from input.tasks import add_queue_project

add_queue_project.delay(data)

tasks.py

from celery.decorators import task

@task()
def add_queue_project(data):
    """ run project """
    logger = logging_setup(app="project")

    logger.info("starting project runner..")
    f = project_runner(data)
    f.main()

class project_runner():
    """ main project runner """

    def __init__(self,data):
        self.data = data
        self.logger = logging_setup(app="project")

    def self.main(self):
        .... Code

settings.py

THIRD_PARTY_APPS = (
    'south',  # Database migration helpers:
    'crispy_forms',  # Form layouts
    'rest_framework',
    'djcelery',
)

import djcelery
djcelery.setup_loader()

BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""

CELERY_IMPORTS = ("input.tasks", )

celeryd

The line im running is to start celery,

python2.7 manage.py celeryd -l info

Thanks,

Upvotes: 4

Views: 2748

Answers (1)

lehins
lehins

Reputation: 9767

I don't have an exact answer for you, but there are a few things you should look into:

  • djcelery is deprecated, so if you are using new version of celery there may be some sort of conflict.

  • If your input app is listed in INSTALLED_APPS celery will discover it, so you don't need to add it to CELERY_IMPORTS = ("input.tasks", ), which maybe the cause of your problem, since tasks could be loaded multiple times

  • try giving your task a name @task(name='input.tasks.add'), it will know that it is the same task, no matter how you import it.

Looking at your setting it looks like you are using an old version of celery, or you are using you old configuration for new version of celery. In any case make sure you have newest version and try this configuration instead of what you have:

BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

Now, you also will have to configure celery differently:

Get rid of djcelery stuff completely.

Create proj/celery.py inside your django project:

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

In your proj/__init__.py:

from __future__ import absolute_import

from proj.celery import app as celery_app

Then if your input app is a reusable app and is not part of your project use @shared_task instead of @task decorator.

Then run celery:

celery -A proj worker -l info

Hope it helps.

Upvotes: 2

Related Questions