RenanSchwyz
RenanSchwyz

Reputation: 101

Force a task in Luigi to always run

I have a source of data and a pipeline in Luigi that get this data, treats it and stores it in a file. The source of data is updated from time to time and I need the files generated by Luigi to be updated as well.

In this pipeline I call a method that will call a Luigi WrapperTask, this one will call a task I need to re-run. AnotherForceableTask is the task I need to re-run, but I'm inheriting this task in ForceableTask and creating a __init__ method that will remove all the AnotherForceableTask outputs to force the task to run.

The problem is: that approach only works on the first time I run the pipeline, after that, Luigi doesn't check the __init__ method anymore and go straight to the Complete() method in AnotherForceableTask, returning that the task is already complete.

class ForceableTask(AnotherForceableTask):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        outputs = luigi.task.flatten(self.output())
        for out in outputs:                
            if out.exists():
                os.remove(out.path)
                
class WrapperTask(luigi.WrapperTask):
    dataref = luigi.Parameter()
    process_list = luigi.ListParameter()

    def complete(self):
        return False

    def requires(self):
        yield ForceableTask(dataref = self.dataref,
                            process_list - self.process_list)

def startPipeline(formatedDate, processes):
    buildResult = luigi.build([WrapperTask(dataref=formatedDate,
                                           process_list = processes)], 
                    local_scheduler = True, 
                    detailed_summary=True,
                    workers=1)
    return buildResult

ForceableTask always receive the same parameters, because of that, I understand that Luigi ignores the __init__ method and goes straight to the Complete() after the first run. I could make this pipeline work by giving a random parameter to ForceableTask, making Luigi check the __init__ method first.

But I don't want to give a random parameter just for Luigi to check my __init__ method in ForceableTask. Anyone knows a more elegant way to force Luigi to re-run a task? I know this is not how Luigi should work but I need those files to be up to date.

Upvotes: 1

Views: 886

Answers (1)

HadiKutabi
HadiKutabi

Reputation: 1

You can manipulate the complete method to keep the task running always.

class MyAlwaysRunningTask(luigi.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.done = False

    def complete(self):
        return self.done

here, self.done is always set to False, which means the task in never complete.

Upvotes: 0

Related Questions