tsantor
tsantor

Reputation: 544

Celery / RabbitMQ / Django not running tasks

I am hoping someone can help me as I've looked on Stack Overflow and cannot find a solution to my problem. I am running a Django project and have Supervisor, RabbitMQ and Celery installed. RabbitMQ is up and running and Supervisor is ensuring my celerybeat is running, however, while it logs that the beat has started and sends tasks every 5 minutes (see below), the tasks never actually execute:

My supervisor program conf:

[program:nrv_twitter]
; Set full path to celery program if using virtualenv
command=/Users/tsantor/.virtualenvs/nrv_env/bin/celery beat -A app --loglevel=INFO --pidfile=/tmp/nrv-celerybeat.pid --schedule=/tmp/nrv-celerybeat-schedule

; Project dir
directory=/Users/tsantor/Projects/NRV/nrv

; Logs
stdout_logfile=/Users/tsantor/Projects/NRV/nrv/logs/celerybeat_twitter.log
redirect_stderr=true

autorestart=true
autostart=true
startsecs=10
user=tsantor

; if rabbitmq is supervised, set its priority higher so it starts first
priority=999

Here is the output of the log from the program above:

[2014-12-16 20:29:42,293: INFO/MainProcess] beat: Starting...
[2014-12-16 20:34:08,161: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:39:08,186: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:44:08,204: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:49:08,205: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:54:08,223: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)

Here is my celery.py settings file:

from datetime import timedelta

BROKER_URL = 'amqp://guest:guest@localhost//'

CELERY_DISABLE_RATE_LIMITS = True

CELERYBEAT_SCHEDULE = {
    'gettweets-every-5-mins': {
        'task': 'twitter.tasks.get_tweets',
        'schedule': timedelta(seconds=300) # 300 = every 5 minutes
    },
}

Here is my celeryapp.py:

from __future__ import absolute_import
import os
from django.conf import settings
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

app = Celery('app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Here is my twitter/tasks.py:

from __future__ import absolute_import
import logging
from celery import shared_task
from twitter.views import IngestTweets

log = logging.getLogger('custom.log')

@shared_task
def get_tweets():
    """
    Get tweets and save them to the DB
    """
    instance = IngestTweets()
    IngestTweets.get_new_tweets(instance)

    log.info('Successfully ingested tweets via celery task')
    return True

The get_tweets method never gets executed, however I know it works as I can execute get_tweets manually and it works fine.

I have spent two days trying to figure out why its sending due tasks, but not executing them? Any help is greatly appreciated. Thanks in advance.

Upvotes: 3

Views: 2163

Answers (2)

lofidevops
lofidevops

Reputation: 17032

You need to a start both a worker process and a beat process. You can create separate processes as described in tsantor's answer, or you can create a single process with both a worker and a beat. This can be more convenient during development (but is not recommended for production).

From "Starting the scheduler" in the Celery documentation:

You can also embed beat inside the worker by enabling the workers -B option, this is convenient if you’ll never run more than one worker node, but it’s not commonly used and for that reason isn’t recommended for production use:

$ celery -A proj worker -B

For expression in Supervisor config files see https://github.com/celery/celery/tree/master/extra/supervisord/ (linked from "Daemonization")

Upvotes: 0

tsantor
tsantor

Reputation: 544

user2097159 thanks for pointing me in the right direction, I was not aware I also must run a worker using supervisor. I thought it was either a worker or a beat, but now I understand that I must have a worker to handle the task and a beat to fire off the task periodically.

Below is the missing worker config for supervisor:

[program:nrv_celery_worker]
; Worker
command=/Users/tsantor/.virtualenvs/nrv_env/bin/celery worker -A app --loglevel=INFO

; Project dir
directory=/Users/tsantor/Projects/NRV/nrv

; Logs
stdout_logfile=/Users/tsantor/Projects/NRV/nrv/logs/celery_worker.log
redirect_stderr=true

autostart=true
autorestart=true
startsecs=10
user=tsantor
numprocs=1

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

I then reset the RabbitMQ queue. Now that I have both the beat and worker programs managed via supervisor, all is working as intended. Hope this helps someone else out.

Upvotes: 2

Related Questions