Reputation: 869
I want to create a background task to update a record on a specific date. I'm using Django and Celery with RabbitMQ.
I've managed to get the task called when the model is saved with this dummy task function:
tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id
this task is called from my save() method in my models.py
from django.db import models
from celery import current_app
class News(models.model):
(...)
def save(self, *args, **kwargs):
current_app.send_task('news.tasks.update_news_status', args=(self.id,))
super(News, self).save(*args, **kwargs)
Thing is I want to import my News model in tasks.py but if I try to like this:
from .models import News
I get this error :
django.core.exceptions.ImproperlyConfigured: Requested setting DEFAULT_INDEX_TABLESPACE, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.
This is how mi celery.py looks like
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
I have already tried this:
and ran into different errors but in the end I'm not able to import any module in tasks.py
There might be something wrong with my config but I can't see the error, I followed the steps in The Celery Docs: First steps with Django
Also, my project structure looks like this:
├── myapp
│ ├── __init__.py
├── ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── news
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── tasks.py
│ ├── urls.py
│ ├── models.py
│ ├── views.py
├── manage.py
I'm executing the worker from myapp
directory like this:
celery -A news.tasks worker --loglevel=info
What am I missing here? Thanks in advance for your help!
lambda: settings.INSTALLED_APPS
EDIT
After making the changes suggested in comments:
Add this to celery.py
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
and import inside method: tasks.py
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
from .models import News
return news_id
I get the following error:
[2018-07-20 12:24:29,337: ERROR/ForkPoolWorker-1] Task news.tasks.update_news_status[87f9ec92-c260-4ee9-a3bc-5f684c819f79] raised unexpected: ValueError('Attempted relative import in non-package',)
Traceback (most recent call last):
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/Users/carla/Develop/App/backend/news/tasks.py", line 12, in update_news_status
from .models import News
ValueError: Attempted relative import in non-package
Upvotes: 4
Views: 7401
Reputation: 2061
Here what i would do (Django 1.11 and celery 4.2), you have a problem in your celery config and you try to re-declare the Celery instance :
tasks.py
from myapp.celery import app # would contain what you need :)
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id
celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp', backend='rpc://', broker=BROKER_URL) # your config here
app.config_from_object('django.myapp:settings', namespace='CELERY') # change here
app.autodiscover_tasks()
models.py
from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
super(News, self).save(*args, **kwargs)
from news.tasks import update_news_status
update_news_status.delay(self.id) # change here
And launch it with celery -A myapp worker --loglevel=info
because your app is defined in myapp.celery so -A parameter need to be the app where the conf is declared
Upvotes: 0
Reputation: 869
Ok so for anyone struggling with this... turns out my celery.py wasn't reading env variables from the settings.
After a week and lots of research I realised that Celery is not a process of Django but a process running outside of it (duh), so when I tried to load the settings they were loaded but then I wasn't able to access the env variables I have defined in my .env ( I use the dotenv library). Celery was trying to look up for the env variables in my .bash_profile (of course)
So in the end my solution was to create a helper module in the same directory where my celery.py
is defined, called load_env.py
with the following
from os.path import dirname, join
import dotenv
def load_env():
"Get the path to the .env file and load it."
project_dir = dirname(dirname(__file__))
dotenv.read_dotenv(join(project_dir, '.env'))
and then on my celery.py (note the last import and first instruction)
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
from .load_env import load_env
load_env()
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('myapp.settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
after the call to load_env()
env variables are loaded and the celery worker has access to them. By doing this I am now able to access other modules from my tasks.py, which was my main problem.
Credits to this guys (Caktus Consulting Group) and their django-project-template because if it wasn't for them I wouldn't find the answer. Thanks.
Upvotes: 6
Reputation: 1540
try something like this. its working in 3.1 celery, import should happen inside save method and after super()
from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
(...)
super(News, self).save(*args, **kwargs)
from task import update_news_status
update_news_status.apply_async((self.id,)) #apply_async or delay
Upvotes: 0