simon
simon

Reputation: 2821

How do I configure logging in Luigi with multiple workers?

Whilst running an application from Luigi I have logging directed to stdout and a file. This works fine with workers=1. However as soon as I set workers=4 I get no application logging.

Is logging possible with multiple workers?

Upvotes: 1

Views: 2326

Answers (1)

simon
simon

Reputation: 2821

Seems there is no simple solution in Luigi but the following works. Only thing is you have to remember to call "enable" at the start of every run method. I have not found a solution to avoid this.

import logging
import luigi
from luigi.interface import build, setup_interface_logging

from multiprocessing import Queue
from logging.handlers import QueueHandler, QueueListener

########### enable multiprocess logging ############################

q = Queue()

def run(tasks, *args, **kwargs):
    """ run tasks with queuelistener to handle logging """
    setup_interface_logging.has_run = True
    workers = kwargs.get("workers", 1) > 1
    if workers:
        log=logging.getLogger()
        listener = QueueListener(q, *log.handlers)
        listener.start()
        build(tasks, *args, **kwargs)
        listener.stop()
    else:
        build(tasks, *args, **kwargs)

class Task(luigi.Task):
    """ redirect logging to queue """
    def __init__(self):
        """ add q to process """
        super().__init__()
        self.q = q

    def enable(self):
        """ call at start of each run process to initialise settings and redirect logging to queue """
        
        # for 1 worker leave settings alone
        log = logging.getLogger()
        if log.handlers:
            return

        # for multiple workers load settings but replace handlers with queue
        from logcon import log
        log.handlers = []
        log.addHandler(QueueHandler(self.q))

######################################################################

class Test(Task):

    def complete(self):
        return False

    def run(self):
        self.enable()

        log = logging.getLogger()
        log.debug("running")
        log.info("running")
        log.warning("running")

        log = logging.getLogger("runlog")
        log.debug("running")
        log.info("running")
        log.warning("running")

        log = logging.getLogger("luigi-interface")
        log.debug("running")
        log.info("running")
        log.warning("running")

    def requires(self):
        return []

Upvotes: 1

Related Questions