Reputation: 64739
I have a Celery task like:
from celery.task import task
from django.conf import settings
from base.tasks import BaseTask
@task(name="throw_exception", base=BaseTask)
def print_value(*args, **kwargs):
print('BROKER_URL:', settings.BROKER_URL)
and I'm running a Celery worker inside my virtualenv like:
celery worker -A myproject -l info
The Worker shows:
Connected to amqp://guest:**@127.0.0.1:5672/myapp
And when I launch my task from the Django shell with:
>>> from django.conf import settings
>>> settings.BROKER_URL
'amqp://guest:**@127.0.0.1:5672/myapp'
>>> from myapp.tasks import print_value
>>> print_value.delay()
I never see the task executed in my worker's log.
However, if I instead change my worker to use a BROKER_URL with the default "/" vhost, then it immediately executes all the pending tasks, implying all my calls of print_value.delay()
are sending it to the wrong vhost even though the correct BROKER_URL is set. What am I doing wrong?
Edit: The problem seems to be Celery doesn't have a consistent @task decorator, and by using the wrong decorator, you disconnect the task from your broker settings. So essentially, all my tasks are configured to use the default broker, instead of the one defined in my settings. The old docs say to use from celery.task import task
but the new docs...don't really specify, and seem to imply you should use the app
instance defined in your celery.py
file, like @app.task
. The problem with this is that all my tasks are in separate tasks.py
files, where they can't access the app
instance. If I copy a task into my celery.py
and use the @app.task
decorator, then it uses the correct vhost and works as expected, but obvious, this isn't a practical fix, because I'd have to copy dozens of functions into this file. How do I fix this properly?
Upvotes: 5
Views: 3806
Reputation: 2947
Having the same question for myself right now using Django + (Celery + RabbitMQ). My solution was,
CELERY_BROKER_URL=amqp://<user>:<password>@localhost:5672/<vhost>
Here's a detailed confirmation from RabbitMQ.com > Client Documentation > RabbitMQ URI Specification
For what it's worth...
...there's a lot going on with Celery + RabbitMQ. I was looking at RabbitMQ with rabbitmqctl list_vhosts
-- and I don't see my vhost. WTF? Finally, I realize I configured supervisord
too early on my local development server. Starting Celery from the CLI gives a bunch of feedback that supervisord
puts somewhere not directly under my nose, like:
[2021-02-19 18:26:52,803: WARNING/MainProcess] (0, 0): (403) ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile.
AMQP
immediately reminded me of the connection string. Boom. There's your vhost
.
Upvotes: 4
Reputation: 64739
After digging through Celery's code, the only way I could find to set the current app was to call celery._state._set_current_app(app)
. Obviously, this an internal method and not designed to be used this way, but I couldn't find any other way to explicitly set my custom app instance as the "current" app. I would have thought this should be done automatically, especially since my code is taken directly from the tutorial, so either the docs are incomplete or this is a bug.
In any case, the working celery file looks like:
from __future__ import absolute_import, print_function
import os
import sys
from celery import Celery
from celery._state import _set_current_app
import django
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
_set_current_app(app)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings.settings')
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../myproject')))
django.setup()
from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
That caused all the @task
decorators in all my tasks.py files to correctly access my custom Celery instance.
Upvotes: 1
Reputation: 7717
Just provide a demo use djcelery with different vhost which is tested.
in your project folder,__init__.py
:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
celery.py
,replace SchoolMS
with your own Project label:
from __future__ import absolute_import
import os
from celery import Celery, platforms
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SchoolMS.settings')
app = Celery('SchoolMS')
platforms.C_FORCE_ROOT = True
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
settings.py
:
BROKER_URL = 'amqp://schoolms:schoolms@localhost:5672/schoolms'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_SCHEDULE = {
}
user/tasks.py
:
from celery import task
from django.conf import settings
@task
def send_tel_verify(tel_verify_id):
try:
tel_verify = TelVerify.objects.get(id=tel_verify_id)
try:
send_sms(tel_verify.tel, 'xxxx')
return ''success'
except SmsError as e:
return 'error'
except ObjectDoesNotExist:
return 'not found'
user/views.py
send_tel_verify.delay(tel_verify.id)
Upvotes: 0