Constantino
Constantino

Reputation: 2391

Luigi flexible pipeline and passing parameters all the way through

I've recently implemented a luigi pipeline to handle the processing for one of our bioinformatics pipelines. However, there's something fundamental about how to setup these tasks that I'm not grasping.

Let's say I've got a chain of three tasks that I'd like to be able to run with multiple workers. For example, the dependency graph for three workers might look like:

/ taskC -> taskB -> taskA
- taskC -> taskB -> taskA
\ taskC -> taskB -> taskA

and I might write

class entry(luigi.Task):

    in_dir = luigi.Parameter()

    def requires(self):
        for f in self.in_dir:
            yield taskC(pass_through=f)

    def run(self):
        some logic using self.input().path
        from each worker in the above yield

class taskA(luigi.Task):

    in_file_A = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outA.txt')

    def run(self):
        some logic generating outA.txt

class taskB(luigi.Task):

    pass_through = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outB.txt')

    def requires(self):
        return taskA(in_file_A=self.pass_through)

    def run(self):
        some logic using self.input().path [outA.txt] 
        and generating self.output().path [outB.txt]

class taskC(luigi.Task):

    pass_through = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('outC.txt')

    def requires(self):
        return taskB(pass_through=self.pass_through)

    def run(self):
        some logic using self.input().path [outB.txt] 
        and generating self.output().path [outC.txt]

If my code lives in pipeline.py I might launch this with:

luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/

The fact that I'm sending parameter pass_through all the way through to taskA doesn't feel like the right approach. Furthermore, if sometime in the future I already have the data generated (separately) by taskA, taskB isn't flexible enough to handle that situation. Perhaps I might write:

class taskB(luigi.Task):

    in_file_B = luigi.Parameter() # if we already have the output of taskA
    pass_through = luigi.Parameter() # if we require taskA

    def output(self):
        return luigi.LocalTarget('outB.txt')

    def requires(self):
        if self.pass_through:
            return taskA(in_file_A=self.pass_through)

    def run(self):
        if self.input().path:
           logic_input = self.input().path
        else:
           logic_input = self.in_file_B

        some logic using 'logic_input'
        and generating self.output().path [outB.txt]

I'd like to know if this is the 'proper' design pattern for Luigi or if I'm completely off base.

Upvotes: 1

Views: 1777

Answers (2)

I run highly parameterized luigi worfklows. I've come up with two approaches to solve this problem.

The first approach is to create a container class that holds all parameters for the workflow, and have the Tasks inherit them. You can then pass **self.param_kwargs around the dependency graph indescriminately, more or less:

@dataclass
class JobParams:
    in_dir = luigi.Parameter()
    in_file_A = luigi.Parameter()
    pass_through = luigi.Parameter()

class entry(luigi.Task, JobParams):

    def requires(self):
        for f in self.in_dir:
            yield taskC(**self.param_kwargs)

etc. The downside to this approach is that the more one modularizes the code, the more one runs into design issues like unwieldy inheritance hierarchies and linters flagging top-level classes for having too many ancestors. However, this may be workable solution depending on your implementation.

The more flexible solution I found was to patch all task classes in a workflow to share parameters. This way, one can have everything modularized and defined elsewhere without dependencies or with default dependencies, and with only the parameters needed for their own operations, and then manage dependencies and params at the workflow level.

import luigi
import yaml

from common.tasks import TaskA, TaskB, TaskC

def share_attributes(objs: list | set, attr_class):
    """Shares attribute of specified class/parent class among all objects.

    Args:
        objs: Objects or classes to share attributes.
        attr_class: Class or parent class of attributes to share.
    """
    attr_dict = {}

    for obj in objs:
        # Iterate through objects and match all instances of `attr_class`
        for attr_name in dir(obj):
            attr_value = getattr(obj, attr_name)
            # Will always keep the most recent value for each name.
            if isinstance(attr_value, attr_class):
                attr_dict[attr_name] = attr_value

    for obj in objs:
        for key, value in attr_dict.items():
            setattr(obj, key, value)

    return objs

share_attributes([TaskA, TaskB, TaskC], luigi.Parameter)

class TaskBWithDeps(TaskB):
    def requires(self):
        return TaskA(**self.param_kwargs)

class TaskCWithDeps(TaskC)
    def requires(self):
        return TaskBWithDeps(**self.param_kwargs)

if __name__ == "__main__":


    with open("my_config_file.yaml", "r", encoding="utf-8") as f:
        params = yaml.safe_load(f)

    luigi.build([TaskCWithDeps(**params)], local_scheduler=True)

This approach shouldn't cause any issues, as long as you're aware of scope.

Upvotes: 0

MattMcKnight
MattMcKnight

Reputation: 8290

I think this is largely an artifact of the abstract tasks you have here, in the real world, you probably need to know at each where you are reading from / writing to. See for example:

class DecompressTask(luigi.Task):
    dirname = luigi.Parameter()
    filename = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(os.path.join(self.dirname , self.filename + ".txt"))

    def run(self):
        decompress(os.path.join(self.dirname, self.filename + ".gz"),
                   os.path.join(self.dirname, self.filename + ".txt"))


class TranslateTask(luigi.Task):
    dirname = luigi.Parameter()
    filename = luigi.Parameter()

    def requires(self):
        return DecompressTask(dirname=self.dirname, filename=self.filename)

    def output(self):
        return luigi.LocalTarget(os.path.join(self.dirname + self.filename + ".translated"))

    def run(self):
        translate(os.path.join(self.dirname, self.filename + ".txt"),
                  os.path.join(self.dirname, self.filename + ".translated"))


class ProcessDirectory(luigi.WrapperTask):
    dirname = luigi.Parameter()

    def requires(self):
        tasks = []
        for file_name in os.listdir(self.dirname):
            if file_name.endswith("gz"):
                prefix = file_name.split(".")[0]
                tasks.append(TranslateTask(filename=prefix, dirname=self.dirname))
        return tasks

Upvotes: 0

Related Questions