arlyon
arlyon

Reputation: 135

Celery beat periodic tasks fail when the app/tasks.py imports a Model

I have a program structure identical to the celery django demo that works fine. The problem is that in my demoapp/tasks.py analogue I import a model (from .models import Service) which triggers

django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

Strange to say the least. The entire point of the schedule is to update the database by sending a request to an api. Upon taking out the import statement and commenting out appropriate code the task runs as expected. Here is the code for the model, however the error occurs no matter how many fields the model have, making me believe the error isn't due to the way the model is programmed. I am tracking trains.

from django.db import models

# Create your models here.

class Service(models.Model):
    id = models.TextField(primary_key=True)
    scheduled = models.DateTimeField()
    expected = models.DateTimeField(null=True, blank=True) # null means cancelled
    service = models.TextField()
    platform = models.TextField()
    origin = models.TextField()
    dest = models.TextField()

EDIT: here is my demoapp/tasks.py file:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import requests
from .models import Service
from django.db import transaction
import datetime
import pytz

def clean(entry):
    def remove_location_lists(i):
        """takes care of services with multiple origins or destinations, taking only the first of each."""
        if type(i["origin"]["location"]) is list:
            i["origin"] = i["origin"]["location"][0]["locationName"]
        else:
            i["origin"] = i["origin"]["location"]["locationName"]

        if type(i["dest"]["location"]) is list:
            i["dest"] = i["dest"]["location"][0]["locationName"]
        else:
            i["dest"] = i["dest"]["location"]["locationName"]

        return i

    def parse_time(i):
        """converts the raw time string to a datetime object, and includes some logic to manage crossing midnight."""
        if i["expected"] == "Cancelled":
            i["service"] = "cancelled"
            i["expected"] = None
        elif i["expected"] == "Delayed":  # blank
            i["service"] = "delayed"
            i["expected"] = None
        else:
            i["service"] = "ontime"
            i["expected"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))

        i["scheduled"] = pytz.utc.localize(datetime.datetime.strptime(i["scheduled"], '%H:%M'))
        i["scheduled"] = i["scheduled"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
                                                year=datetime.date.today().year)

        try:
            i["expected"] = i["expected"].replace(day=datetime.date.today().day, month=datetime.date.today().month,
                                                  year=datetime.date.today().year)

            # include day detection logic!

        except Exception as inst:
            pass

        return i

    entry = remove_location_lists(entry)
    entry = parse_time(entry)

    return entry

@shared_task
def get_trips():
    print("run task")
    url = "http://www.southernrailway.com/ajax/departures/json/"
    get = {'from': 'GTW', 'to': '', 'id': ''}

    r = requests.post(url, get)
    json = r.json()

    unsaved = []

    for i in json["arrayServices"]:
        entry = {'id': i["serviceID"],
                 'scheduled': i["std"],
                 'expected': i["etd"],
                 'service': i["service"],
                 'platform': i["platform"],
                 'origin': i["origin"],
                 'dest': i["destination"]}

        entry = clean(entry)

        entry = Service(id=entry['id'], scheduled=entry['scheduled'], expected=entry["expected"],
                        service=entry['service'], platform=entry['platform'], origin=entry['origin'],
                        dest=entry['dest'])

        unsaved.append(entry)

    with transaction.atomic():
        for i in unsaved:
            i.save()

And, for good measure, here are the contents of my celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from shitternrailways.tasks import get_trips

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'arlyon.settings')

app = Celery('arlyon')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(1, get_trips.s(), name="get trips")

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

and finally the full traceback:

/Users/v/Documents/programming/venvs/arlyon/bin/python /Users/v/Documents/programming/arlyon/manage.py runserver 8000
Traceback (most recent call last):
  File "/Users/v/Documents/programming/arlyon/manage.py", line 22, in <module>
    execute_from_command_line(sys.argv)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 353, in execute_from_command_line
    utility.execute()
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/core/management/__init__.py", line 302, in execute
    settings.INSTALLED_APPS
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 55, in __getattr__
    self._setup(name)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 43, in _setup
    self._wrapped = Settings(settings_module)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/conf/__init__.py", line 99, in __init__
    mod = importlib.import_module(self.SETTINGS_MODULE)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 944, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 958, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 665, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/Users/v/Documents/programming/arlyon/arlyon/__init__.py", line 5, in <module>
    from .celery import app as celery_app
  File "/Users/v/Documents/programming/arlyon/arlyon/celery.py", line 4, in <module>
    from shitternrailways.tasks import get_trips
  File "/Users/v/Documents/programming/arlyon/shitternrailways/tasks.py", line 5, in <module>
    from .models import Service
  File "/Users/v/Documents/programming/arlyon/shitternrailways/models.py", line 5, in <module>
    class Service(models.Model):
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/db/models/base.py", line 94, in __new__
    app_config = apps.get_containing_app_config(module)
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 239, in get_containing_app_config
    self.check_apps_ready()
  File "/Users/v/Documents/programming/venvs/arlyon/lib/python3.5/site-packages/django/apps/registry.py", line 124, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

Process finished with exit code 1

Any searching has revealed little in the way of help, probably attributable to how young celery 4 is. So I turned to you guys. Thanks for any tips you have.

Upvotes: 1

Views: 1105

Answers (1)

Alexander Tyapkov
Alexander Tyapkov

Reputation: 5017

You are using @shared_task decorator for the task. It means that task is not attached to any application. However in celery.py you are trying to use this task for the beat scheduler and it throws you error.

I see two possible ways to solve it. Firstly you can move this task from demoapp into celery.py and use it with @app.task decorator.

Secondly you can try to access the task from app. For that you have called app.autodiscover_tasks(). It means that tasks are loaded into app. So you can try to:

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(1, app.tasks["demoapp.get_trips"], name="get trips")

However it needs to be tested. I didn't try it in my machine. Good luck!

Upvotes: 1

Related Questions