Reputation: 101
I have a source of data and a pipeline in Luigi that get this data, treats it and stores it in a file. The source of data is updated from time to time and I need the files generated by Luigi to be updated as well.
In this pipeline I call a method that will call a Luigi WrapperTask, this one will call a task I need to re-run. AnotherForceableTask is the task I need to re-run, but I'm inheriting this task in ForceableTask and creating a __init__ method that will remove all the AnotherForceableTask outputs to force the task to run.
The problem is: that approach only works on the first time I run the pipeline, after that, Luigi doesn't check the __init__ method anymore and go straight to the Complete() method in AnotherForceableTask, returning that the task is already complete.
class ForceableTask(AnotherForceableTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
outputs = luigi.task.flatten(self.output())
for out in outputs:
if out.exists():
os.remove(out.path)
class WrapperTask(luigi.WrapperTask):
dataref = luigi.Parameter()
process_list = luigi.ListParameter()
def complete(self):
return False
def requires(self):
yield ForceableTask(dataref = self.dataref,
process_list - self.process_list)
def startPipeline(formatedDate, processes):
buildResult = luigi.build([WrapperTask(dataref=formatedDate,
process_list = processes)],
local_scheduler = True,
detailed_summary=True,
workers=1)
return buildResult
ForceableTask always receive the same parameters, because of that, I understand that Luigi ignores the __init__ method and goes straight to the Complete() after the first run. I could make this pipeline work by giving a random parameter to ForceableTask, making Luigi check the __init__ method first.
But I don't want to give a random parameter just for Luigi to check my __init__ method in ForceableTask. Anyone knows a more elegant way to force Luigi to re-run a task? I know this is not how Luigi should work but I need those files to be up to date.
Upvotes: 1
Views: 886
Reputation: 1
You can manipulate the complete method to keep the task running always.
class MyAlwaysRunningTask(luigi.Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.done = False
def complete(self):
return self.done
here, self.done
is always set to False, which means the task in never complete.
Upvotes: 0