Reputation: 16691
I'm facing an issue where I'm placing a task into the queue and it is being run multiple times. From the celery logs I can see that the same worker is running the task ...
[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success
However when I view the actual logs of the application it is showing 5-6 log lines for each step (??).
Im using Django 1.6 with RabbitMQ. The method for placing into the queue is via placing a delay on a function.
This function (task decorator is added( then calls a class which is run.
Has anyone any idea on the best way to troubleshoot this ?
Edit : As requested heres the code,
views.py
In my view im sending my data to the queue via ...
from input.tasks import add_queue_project
add_queue_project.delay(data)
tasks.py
from celery.decorators import task
@task()
def add_queue_project(data):
""" run project """
logger = logging_setup(app="project")
logger.info("starting project runner..")
f = project_runner(data)
f.main()
class project_runner():
""" main project runner """
def __init__(self,data):
self.data = data
self.logger = logging_setup(app="project")
def self.main(self):
.... Code
settings.py
THIRD_PARTY_APPS = (
'south', # Database migration helpers:
'crispy_forms', # Form layouts
'rest_framework',
'djcelery',
)
import djcelery
djcelery.setup_loader()
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""
CELERY_IMPORTS = ("input.tasks", )
celeryd
The line im running is to start celery,
python2.7 manage.py celeryd -l info
Thanks,
Upvotes: 4
Views: 2748
Reputation: 9767
I don't have an exact answer for you, but there are a few things you should look into:
djcelery
is deprecated, so if you are using new version of celery
there may be some sort of conflict.
If your input
app is listed in INSTALLED_APPS
celery will discover it, so you don't need to add it to CELERY_IMPORTS = ("input.tasks", )
, which maybe the cause of your problem, since tasks could be loaded multiple times
try giving your task a name @task(name='input.tasks.add')
, it will know that it is the same task, no matter how you import it.
Looking at your setting it looks like you are using an old version of celery, or you are using you old configuration for new version of celery. In any case make sure you have newest version and try this configuration instead of what you have:
BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Now, you also will have to configure celery differently:
Get rid of djcelery
stuff completely.
Create proj/celery.py
inside your django project:
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('proj')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
In your proj/__init__.py
:
from __future__ import absolute_import
from proj.celery import app as celery_app
Then if your input
app is a reusable app and is not part of your project use @shared_task
instead of @task
decorator.
Then run celery:
celery -A proj worker -l info
Hope it helps.
Upvotes: 2