Aditya Patel
Aditya Patel

Reputation: 168

Assistance with Celery Logging

So Celery is a super great library but its documentation for the logging section isn't the best, which bring me here to request for assistance.

My Script as of now is as so (well in summary):

import logging
from celery import Celery
from celery.utils.log import get_logger
from task import process
import config

logger = get_logger(__name__)
timber_handler = timber.TimberHandler(api_key=config.key,
                                  level=logging.INFO)
logger.addHandler(timber_handler)



app = Celery('task',
             broker=config.url,
             backend='rpc://')

@app.task
def run_task():
    status = get_status() # get alive or dead status
    if status == 1:
        logger.info("Task is running")
        process()


@app.on_after_configure.connect
def task_periodic(**kwargs):
    app.add_periodic_task(2.0, run_task.s(), name="Run Constantly")
    # More tasks

The process function in the tasks.py file is very basic function hitting up APIs and DBs for some info and I want to be able to log that to a logger (timber.io) which attaches to the python logging library and is an online storage for logs.

However, my major issue is that the logs are getting sent to stdout and not to the timber logs. I have looked at celery.signals but the documentation isn't great. Any assistance here would be greatly appreciated. Thank you.

Upvotes: 8

Views: 24764

Answers (1)

Bjoern Stiel
Bjoern Stiel

Reputation: 4181

Can you try this?

import logging
import os
import sys
from celery import Celery
from celery.signals import after_setup_logger

app = Celery('app')
app.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': './broker/out',
        'data_folder_out': './broker/out',
        'data_folder_processed': './broker/processed'
    },
    'result_persistent': False,
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})


logger = logging.getLogger(__name__)


for f in ['./broker/out', './broker/processed']:
    if not os.path.exists(f):
        os.makedirs(f)

@after_setup_logger.connect
def setup_loggers(logger, *args, **kwargs):
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # add filehandler
    fh = logging.FileHandler('logs.log')
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    logger.addHandler(fh)


@app.task()
def add(x, y):
    logger.info('Found addition')
    logger.info('Added {0} and {1} to result, '.format(x,y))
    return x+y


if __name__ == '__main__':
    task = add.s(x=2, y=3).delay()

Start the worker like this:

celery worker --app=app.app --concurrency=1 --loglevel=INFO

And kick off the task asynchronously:

python app.py

I've changed it so it's a stand-alone script that just uses the filesystem as a message broker (also, I've deliberately replaced the timber.io handler with a filehandler).

This writes the logs to logs.log (replace the filehandler with the timber.io handler and that should solve your issue).

I had a bit of a hard time as the I couldn't get it working with worker_hijack_root_logger=False and a custom logger defined in setup_logging.

However, after revisiting the docs, I came to the conclusion it's a better option to not override the logger but just augment it:

If you’d like to augment the logging configuration setup by Celery then you can use the after_setup_logger and after_setup_task_logger signals.

See also: http://docs.celeryproject.org/en/latest/userguide/signals.html#after-setup-logger

Upvotes: 6

Related Questions