Carla Urrea Stabile
Carla Urrea Stabile

Reputation: 869

Can't import models in tasks.py with Celery + Django

I want to create a background task to update a record on a specific date. I'm using Django and Celery with RabbitMQ.

I've managed to get the task called when the model is saved with this dummy task function:

tasks.py

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', broker='amqp://localhost//')


@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
    # (I pass the news id and return it, nothing complicated about it)
    return news_id

this task is called from my save() method in my models.py

from django.db import models
from celery import current_app


class News(models.model):
    (...)

    def save(self, *args, **kwargs):

        current_app.send_task('news.tasks.update_news_status', args=(self.id,))

        super(News, self).save(*args, **kwargs)

Thing is I want to import my News model in tasks.py but if I try to like this:

from .models import News

I get this error :

django.core.exceptions.ImproperlyConfigured: Requested setting DEFAULT_INDEX_TABLESPACE, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.

This is how mi celery.py looks like

from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

I have already tried this:

  1. can't import django model into celery task
  2. I have tried to make the import inside the task method Django and Celery, AppRegisteredNotReady exception
  3. I have also tried this Celery - importing models in tasks.py
  4. I also tried to create a utils.py and import it and was not possible.

and ran into different errors but in the end I'm not able to import any module in tasks.py

There might be something wrong with my config but I can't see the error, I followed the steps in The Celery Docs: First steps with Django

Also, my project structure looks like this:

├── myapp
│   ├── __init__.py
├── ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── news
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── tasks.py
│   ├── urls.py
│   ├── models.py
│   ├── views.py
├── manage.py

I'm executing the worker from myapp directory like this:

celery -A news.tasks worker --loglevel=info

What am I missing here? Thanks in advance for your help!

lambda: settings.INSTALLED_APPS

EDIT

After making the changes suggested in comments: Add this to celery.py app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

and import inside method: tasks.py

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

app = Celery('tasks', broker='amqp://localhost//')


@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
    from .models import News
    return news_id

I get the following error:

[2018-07-20 12:24:29,337: ERROR/ForkPoolWorker-1] Task news.tasks.update_news_status[87f9ec92-c260-4ee9-a3bc-5f684c819f79] raised unexpected: ValueError('Attempted relative import in non-package',)
Traceback (most recent call last):
  File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 382, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
    return self.run(*args, **kwargs)
  File "/Users/carla/Develop/App/backend/news/tasks.py", line 12, in update_news_status
    from .models import News
ValueError: Attempted relative import in non-package

Upvotes: 4

Views: 7401

Answers (3)

MaximeK
MaximeK

Reputation: 2061

Here what i would do (Django 1.11 and celery 4.2), you have a problem in your celery config and you try to re-declare the Celery instance :

tasks.py

from myapp.celery import app # would contain what you need :)
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
    # (I pass the news id and return it, nothing complicated about it)
    return news_id

celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp', backend='rpc://', broker=BROKER_URL) # your config here
app.config_from_object('django.myapp:settings', namespace='CELERY') # change here
app.autodiscover_tasks()

models.py

from django.db import models

class News(models.model):
    (...)
    def save(self, *args, **kwargs):
        super(News, self).save(*args, **kwargs)
        from news.tasks import update_news_status
        update_news_status.delay(self.id) # change here

And launch it with celery -A myapp worker --loglevel=info because your app is defined in myapp.celery so -A parameter need to be the app where the conf is declared

Upvotes: 0

Carla Urrea Stabile
Carla Urrea Stabile

Reputation: 869

Ok so for anyone struggling with this... turns out my celery.py wasn't reading env variables from the settings.

After a week and lots of research I realised that Celery is not a process of Django but a process running outside of it (duh), so when I tried to load the settings they were loaded but then I wasn't able to access the env variables I have defined in my .env ( I use the dotenv library). Celery was trying to look up for the env variables in my .bash_profile (of course)

So in the end my solution was to create a helper module in the same directory where my celery.py is defined, called load_env.py with the following

from os.path import dirname, join
import dotenv


def load_env():
    "Get the path to the .env file and load it."
    project_dir = dirname(dirname(__file__))
    dotenv.read_dotenv(join(project_dir, '.env'))

and then on my celery.py (note the last import and first instruction)

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
from .load_env import load_env

load_env()

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

app = Celery('myapp')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.

app.config_from_object('myapp.settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

after the call to load_env() env variables are loaded and the celery worker has access to them. By doing this I am now able to access other modules from my tasks.py, which was my main problem.

Credits to this guys (Caktus Consulting Group) and their django-project-template because if it wasn't for them I wouldn't find the answer. Thanks.

Upvotes: 6

giveJob
giveJob

Reputation: 1540

try something like this. its working in 3.1 celery, import should happen inside save method and after super()

from django.db import models



class News(models.model):
    (...)

    def save(self, *args, **kwargs):
        (...)
        super(News, self).save(*args, **kwargs)
        from task import update_news_status
        update_news_status.apply_async((self.id,)) #apply_async or delay

Upvotes: 0

Related Questions