Milla Well
Milla Well

Reputation: 3313

luigi dependencies change at runtime

I have a luigi preprocessing task that splits my raw data into smaller files. These Files will then be processed by the actual pipeline.

So regarding the parameters, I would like to require each pipeline with one preprocessed file id as parameter. However, this file id is only generated in the preprocessing step and is thus only known at runtime. To illustrate my idea I provide this not-working code:

import luigi
import subprocess 
import random


class GenPipelineFiles(luigi.Task):

    input_file = luigi.Parameter()

    def requires(self):
        pass

    def output(self):

        for i in range(random.randint(0,10)):
            yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))

    def run(self):

        for iout in self.output:
            command = "touch {}".format(iout.fname)
            subprocess.call(command, shell=True)


class RunPipelineOnSmallChunk(luigi.Task):
    pass


class Experiment(luigi.WrapperTask):

    input_file = luigi.Parameter(default="ex")

    def requires(self):

        file_ids = GenPipelineFiles(input_file=self.input_file)

        for file_id in file_ids:
            yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)


luigi.run()

The wrapper task Experiment should

  1. first, somehow require the splitting of the raw data into documents

  2. secondly, require the actual pipeline with the obtained file id of the preprocessing.

The random number of output files in the GenPipelineFiles indicates that this cannot be hard-coded into the Experiment's requires.

A question that is probably related to this one is the fact, that a luigi task properly only has one input target and one output target. Probably a note on how to model multiple outputs in GenPipelineFiles could also solve the problem.

Upvotes: 3

Views: 1712

Answers (1)

MattMcKnight
MattMcKnight

Reputation: 8290

One simple approach to dealing with multiple outputs is to create a directory named after the input file, and put the output files from the split into that a directory named after the input file. That way the dependent task can just check for the existence of the directory. Let's say I have an input file 123.txt, I then make a directory 123_split with files 1.txt, 2.txt, 3.txt as the output of GenPipelineFiles, and then a directory 123_processed with 1.txt, 2.txt, 3.txt as the output of RunPipelineOnSmallChunk.

For your requires method in Experiment, you have to return the tasks you want to run, in a list for example. The way you have written file_ids = GenPipelineFiles(input_file=self.input_file) makes me think the run method of that object is not being called, because it is not being returned by the method.

here's some sample code that works with targets on a per file basis (but not a task per file basis). I still think it is safer to have a single output target of a directory or a sentinel file out of some kind to indicate you are done. Atomicity is lost unless the tasks ensures each target is created.

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler

sampletask.py

import luigi
import os
import subprocess
import random


class GenPipelineFiles(luigi.Task):

    inputfile = luigi.Parameter()
    num_targets = random.randint(0,10)

    def requires(self):
        pass

    def get_prefix(self):
        return self.inputfile.split(".")[0]

    def get_dir(self):
        return "split_{}".format(self.get_prefix())

    def output(self):
        targets = []
        for i in range(self.num_targets):
            targets.append(luigi.LocalTarget("  {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i)))
         return targets

    def run(self):
        if not os.path.exists(self.get_dir()):
            os.makedirs(self.get_dir())
        for iout in self.output():
            command = "touch {}".format(iout.path)
            subprocess.call(command, shell=True)


class RunPipelineOnSmallChunk(luigi.Task):

    inputfile = luigi.Parameter(default="test")

    def get_prefix(self):
        return self.inputfile.split(".")[0]

    def get_dir(self):
        return "processed_{}".format(self.get_prefix())

    @staticmethod
    def clean_input_path(path):
        return path.replace("split", "processed")

    def requires(self):
        return GenPipelineFiles(self.inputfile)

    def output(self):
        targets = []
        for target in self.input():
            targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path)))
        return targets

    def run(self):
        if not os.path.exists(self.get_dir()):
            os.makedirs(self.get_dir())
        for iout in self.output():
            command = "touch {}".format(iout.path)
            subprocess.call(command, shell=True)

Upvotes: 2

Related Questions