Prometheus
Prometheus

Reputation: 33625

How to check if Celery/Supervisor is running using Python

How to write a script in Python that outputs if celery is running on a machine (Ubuntu)?

My use-case. I have a simple python file with some tasks. I'm not using Django or Flask. I use supervisor to run the task queue. For example,

tasks.py

from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
    return a + b

Supervisor:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

This all works, I now want to have page which checks if celery/supervisor process is running. i.e. something like this maybe using Flask allowing me to host the page giving a 200 status allowing me to load balance.

For example...

check_status.py

from flask import Flask

app = Flask(__name__)

@app.route('/')
def status_check():

    #check supervisor is running
    if supervisor:
         return render_template('up.html')
    else:
        return render_template('down.html')

if __name__ == '__main__':
    app.run()

Upvotes: 19

Views: 9879

Answers (7)

vgel
vgel

Reputation: 3335

Update 09/2020: Jérôme updated this answer for Celery 4.3 here: https://stackoverflow.com/a/57628025/1159735

You can run the celery status command via code by importing the celery.bin.celery package:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms

app = celery.Celery('tasks', broker='redis://')

status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()

def celery_is_up():
    try:
        status.run()
        return True
    except celery.bin.base.Error as e:
        if e.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise e

if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

Upvotes: 26

Jérôme
Jérôme

Reputation: 14674

Inspired by @vgel's answer, using Celery 4.3.0.

import celery
import celery.bin.base
import celery.bin.control
import celery.platforms

# Importing Celery app from my own application
from my_app.celery import app as celery_app


def celery_running():
    """Test Celery server is available

    Inspired by https://stackoverflow.com/a/33545849
    """
    status = celery.bin.control.status(celery_app)
    try:
        status.run()
        return True
    except celery.bin.base.Error as exc:
        if exc.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise


if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

Upvotes: 2

Tom
Tom

Reputation: 31

This isn't applicable for celery, but for anyone that ended up here to see if supervisord is running, check to see if the pidfile defined for supervisord in your supervisord.conf configuration file exists. If so, it's running; if not, it isn't. The default pidfile is /tmp/supervisord.pid, which is what I used below.

import os
import sys

if os.path.isfile("/tmp/supervisord.pid"):
    print "supervisord is running."
    sys.exit()

Upvotes: 0

gipsy
gipsy

Reputation: 3859

A sparse web user interface comes with supervisor. May be you could use that. It can be enabled in the supervisor config. Key to look for is [inet_http_server]

You could even look at the source code of that piece to get ideas to implement your own.

Upvotes: 1

r-m-n
r-m-n

Reputation: 15100

you can parse process state from supervisorctl status output

import subprocess

def is_celery_worker_running():
    ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
    if ctl_output == 'unix:///var/run/supervisor.sock no such file':
        # supervisord not running
        return False
    elif ctl_output == 'No such process celery_worker':
        return False
    else:
        state = ctl_output.split()[1]
        return state == 'RUNNING'

Upvotes: 2

hematag
hematag

Reputation: 7

In my experience, I'd set a message to track whether it was complete or not so that the queues would be responsible for retrying tasks.

Upvotes: -2

lord63. j
lord63. j

Reputation: 4670

How about using subprocess, not sure if it is a good idea:

>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True

Upvotes: 2

Related Questions