Reputation: 3313
I have a luigi
preprocessing task that splits my raw data into smaller files. These Files will then be processed by the actual pipeline.
So regarding the parameters, I would like to require each pipeline with one preprocessed file id as parameter. However, this file id is only generated in the preprocessing step and is thus only known at runtime. To illustrate my idea I provide this not-working code:
import luigi
import subprocess
import random
class GenPipelineFiles(luigi.Task):
input_file = luigi.Parameter()
def requires(self):
pass
def output(self):
for i in range(random.randint(0,10)):
yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))
def run(self):
for iout in self.output:
command = "touch {}".format(iout.fname)
subprocess.call(command, shell=True)
class RunPipelineOnSmallChunk(luigi.Task):
pass
class Experiment(luigi.WrapperTask):
input_file = luigi.Parameter(default="ex")
def requires(self):
file_ids = GenPipelineFiles(input_file=self.input_file)
for file_id in file_ids:
yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)
luigi.run()
The wrapper task Experiment
should
first, somehow require the splitting of the raw data into documents
secondly, require the actual pipeline with the obtained file id of the preprocessing.
The random number of output files in the GenPipelineFiles
indicates that this cannot be hard-coded into the Experiment
's requires
.
A question that is probably related to this one is the fact, that a luigi
task properly only has one input target and one output target. Probably a note on how to model multiple outputs in GenPipelineFiles
could also solve the problem.
Upvotes: 3
Views: 1712
Reputation: 8290
One simple approach to dealing with multiple outputs is to create a directory named after the input file, and put the output files from the split into that a directory named after the input file. That way the dependent task can just check for the existence of the directory. Let's say I have an input file 123.txt, I then make a directory 123_split with files 1.txt, 2.txt, 3.txt as the output of GenPipelineFiles
, and then a directory 123_processed with 1.txt, 2.txt, 3.txt as the output of RunPipelineOnSmallChunk
.
For your requires
method in Experiment
, you have to return the tasks you want to run, in a list for example. The way you have written file_ids = GenPipelineFiles(input_file=self.input_file)
makes me think the run method of that object is not being called, because it is not being returned by the method.
here's some sample code that works with targets on a per file basis (but not a task per file basis). I still think it is safer to have a single output target of a directory or a sentinel file out of some kind to indicate you are done. Atomicity is lost unless the tasks ensures each target is created.
PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler
sampletask.py
import luigi
import os
import subprocess
import random
class GenPipelineFiles(luigi.Task):
inputfile = luigi.Parameter()
num_targets = random.randint(0,10)
def requires(self):
pass
def get_prefix(self):
return self.inputfile.split(".")[0]
def get_dir(self):
return "split_{}".format(self.get_prefix())
def output(self):
targets = []
for i in range(self.num_targets):
targets.append(luigi.LocalTarget(" {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i)))
return targets
def run(self):
if not os.path.exists(self.get_dir()):
os.makedirs(self.get_dir())
for iout in self.output():
command = "touch {}".format(iout.path)
subprocess.call(command, shell=True)
class RunPipelineOnSmallChunk(luigi.Task):
inputfile = luigi.Parameter(default="test")
def get_prefix(self):
return self.inputfile.split(".")[0]
def get_dir(self):
return "processed_{}".format(self.get_prefix())
@staticmethod
def clean_input_path(path):
return path.replace("split", "processed")
def requires(self):
return GenPipelineFiles(self.inputfile)
def output(self):
targets = []
for target in self.input():
targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path)))
return targets
def run(self):
if not os.path.exists(self.get_dir()):
os.makedirs(self.get_dir())
for iout in self.output():
command = "touch {}".format(iout.path)
subprocess.call(command, shell=True)
Upvotes: 2