Reputation: 2391
I've recently implemented a luigi pipeline to handle the processing for one of our bioinformatics pipelines. However, there's something fundamental about how to setup these tasks that I'm not grasping.
Let's say I've got a chain of three tasks that I'd like to be able to run with multiple workers. For example, the dependency graph for three workers might look like:
/ taskC -> taskB -> taskA
- taskC -> taskB -> taskA
\ taskC -> taskB -> taskA
and I might write
class entry(luigi.Task):
in_dir = luigi.Parameter()
def requires(self):
for f in self.in_dir:
yield taskC(pass_through=f)
def run(self):
some logic using self.input().path
from each worker in the above yield
class taskA(luigi.Task):
in_file_A = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outA.txt')
def run(self):
some logic generating outA.txt
class taskB(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
return taskA(in_file_A=self.pass_through)
def run(self):
some logic using self.input().path [outA.txt]
and generating self.output().path [outB.txt]
class taskC(luigi.Task):
pass_through = luigi.Parameter()
def output(self):
return luigi.LocalTarget('outC.txt')
def requires(self):
return taskB(pass_through=self.pass_through)
def run(self):
some logic using self.input().path [outB.txt]
and generating self.output().path [outC.txt]
If my code lives in pipeline.py
I might launch this with:
luigi --module pipeline entry --workers 3 --in-dir some_dir_w_input_files/
The fact that I'm sending parameter pass_through
all the way through to taskA
doesn't feel like the right approach. Furthermore, if sometime in the future I already have the data generated (separately) by taskA
, taskB
isn't flexible enough to handle that situation. Perhaps I might write:
class taskB(luigi.Task):
in_file_B = luigi.Parameter() # if we already have the output of taskA
pass_through = luigi.Parameter() # if we require taskA
def output(self):
return luigi.LocalTarget('outB.txt')
def requires(self):
if self.pass_through:
return taskA(in_file_A=self.pass_through)
def run(self):
if self.input().path:
logic_input = self.input().path
else:
logic_input = self.in_file_B
some logic using 'logic_input'
and generating self.output().path [outB.txt]
I'd like to know if this is the 'proper' design pattern for Luigi or if I'm completely off base.
Upvotes: 1
Views: 1777
Reputation: 1
I run highly parameterized luigi worfklows. I've come up with two approaches to solve this problem.
The first approach is to create a container class that holds all parameters for the workflow, and have the Tasks inherit them. You can then pass **self.param_kwargs
around the dependency graph indescriminately, more or less:
@dataclass
class JobParams:
in_dir = luigi.Parameter()
in_file_A = luigi.Parameter()
pass_through = luigi.Parameter()
class entry(luigi.Task, JobParams):
def requires(self):
for f in self.in_dir:
yield taskC(**self.param_kwargs)
etc. The downside to this approach is that the more one modularizes the code, the more one runs into design issues like unwieldy inheritance hierarchies and linters flagging top-level classes for having too many ancestors. However, this may be workable solution depending on your implementation.
The more flexible solution I found was to patch all task classes in a workflow to share parameters. This way, one can have everything modularized and defined elsewhere without dependencies or with default dependencies, and with only the parameters needed for their own operations, and then manage dependencies and params at the workflow level.
import luigi
import yaml
from common.tasks import TaskA, TaskB, TaskC
def share_attributes(objs: list | set, attr_class):
"""Shares attribute of specified class/parent class among all objects.
Args:
objs: Objects or classes to share attributes.
attr_class: Class or parent class of attributes to share.
"""
attr_dict = {}
for obj in objs:
# Iterate through objects and match all instances of `attr_class`
for attr_name in dir(obj):
attr_value = getattr(obj, attr_name)
# Will always keep the most recent value for each name.
if isinstance(attr_value, attr_class):
attr_dict[attr_name] = attr_value
for obj in objs:
for key, value in attr_dict.items():
setattr(obj, key, value)
return objs
share_attributes([TaskA, TaskB, TaskC], luigi.Parameter)
class TaskBWithDeps(TaskB):
def requires(self):
return TaskA(**self.param_kwargs)
class TaskCWithDeps(TaskC)
def requires(self):
return TaskBWithDeps(**self.param_kwargs)
if __name__ == "__main__":
with open("my_config_file.yaml", "r", encoding="utf-8") as f:
params = yaml.safe_load(f)
luigi.build([TaskCWithDeps(**params)], local_scheduler=True)
This approach shouldn't cause any issues, as long as you're aware of scope.
Upvotes: 0
Reputation: 8290
I think this is largely an artifact of the abstract tasks you have here, in the real world, you probably need to know at each where you are reading from / writing to. See for example:
class DecompressTask(luigi.Task):
dirname = luigi.Parameter()
filename = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.dirname , self.filename + ".txt"))
def run(self):
decompress(os.path.join(self.dirname, self.filename + ".gz"),
os.path.join(self.dirname, self.filename + ".txt"))
class TranslateTask(luigi.Task):
dirname = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return DecompressTask(dirname=self.dirname, filename=self.filename)
def output(self):
return luigi.LocalTarget(os.path.join(self.dirname + self.filename + ".translated"))
def run(self):
translate(os.path.join(self.dirname, self.filename + ".txt"),
os.path.join(self.dirname, self.filename + ".translated"))
class ProcessDirectory(luigi.WrapperTask):
dirname = luigi.Parameter()
def requires(self):
tasks = []
for file_name in os.listdir(self.dirname):
if file_name.endswith("gz"):
prefix = file_name.split(".")[0]
tasks.append(TranslateTask(filename=prefix, dirname=self.dirname))
return tasks
Upvotes: 0